From 2ae1721a80d268984b6ad8469b06651df2e8936e Mon Sep 17 00:00:00 2001 From: VitthalMagadum Date: Thu, 26 Sep 2024 07:35:54 -0400 Subject: [PATCH 1/4] iisue_822 Added TC for SNMP Notification host --- anta/tests/snmp.py | 127 +++++++++++++++++++++++++++- examples/tests.yaml | 14 +++ tests/units/anta_tests/test_snmp.py | 93 +++++++++++++++++++- 3 files changed, 230 insertions(+), 4 deletions(-) diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index c7329b6d7..626b4db22 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -7,11 +7,14 @@ # mypy: disable-error-code=attr-defined from __future__ import annotations -from typing import TYPE_CHECKING, ClassVar +from ipaddress import IPv4Address +from typing import TYPE_CHECKING, ClassVar, Literal -from anta.custom_types import PositiveInteger +from pydantic import BaseModel, model_validator + +from anta.custom_types import Port, PositiveInteger from anta.models import AntaCommand, AntaTest -from anta.tools import get_value +from anta.tools import get_failed_logs, get_item, get_value if TYPE_CHECKING: from anta.models import AntaTemplate @@ -237,3 +240,121 @@ def test(self) -> None: self.result.is_failure(f"Expected `{self.inputs.contact}` as the contact, but found `{contact}` instead.") else: self.result.is_success() + + +class VerifySNMPNotificationHost(AntaTest): + """Verifies the SNMP notification host (SNMP manager) configurations. + + Expected Results + ---------------- + * Success: The test will pass if the SNMP PDU counter(s) are non-zero/greater than zero. + * Failure: The test will fail if the SNMP PDU counter(s) are zero/None/Not Found. + + Examples + -------- + ```yaml + anta.tests.snmp: + - VerifySNMPNotificationHost: + notification_hosts: + - hostname: 192.168.1.100 + vrf: default + notification_type: trap + version: v1 + udp_port: 162 + community_string: public + user: public + ``` + """ + + name = "VerifySNMPNotificationHost" + description = "Verifies the SNMP notification host (SNMP manager) configurations." + categories: ClassVar[list[str]] = ["snmp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show snmp notification host", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifySNMPNotificationHost test.""" + + notification_hosts: list[SNMPHost] + """List of SNMP hosts.""" + + class SNMPHost(BaseModel): + """Model for a SNMP Manager.""" + + hostname: IPv4Address + """IP address of the SNMP notification host.""" + vrf: str = "default" + """Optional VRF for SNMP Hosts. If not provided, it defaults to `default`.""" + notification_type: Literal["trap", "inform"] + """Type of SNMP notification (trap or inform).""" + version: Literal["v1", "v2c", "v3"] + """SNMP protocol version.""" + udp_port: Port | int = 162 + """UDP port for SNMP. If not provided then defaults to 162.""" + community_string: str | None = None + """Optional SNMP community string for authentication.""" + user: str | None = None + """Optional SNMP user for authentication.""" + + @model_validator(mode="after") + def validate_inputs(self: BaseModel) -> BaseModel: + """Validate the inputs provided to the SNMPHost class. + + If SNMP version is either v1 or v2c, community string must be provided. + + If SNMP version is v3, user must be provided. + """ + if self.version in ["v1", "v2c"] and self.community_string is None: + msg = "Community string must be provided when SNMP Protocol version is either v1 or v2c." + raise ValueError(msg) + if self.version == "v3" and self.user is None: + msg = "User must be provided when SNMP Protocol version is v3." + raise ValueError(msg) + return self + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifySNMPNotificationHost.""" + failures: str = "" + + for host in self.inputs.notification_hosts: + hostname = str(host.hostname) + vrf = host.vrf + version = host.version + notification_type = host.notification_type + udp_port = host.udp_port + community_string = host.community_string + user = host.user + + # Verify SNMP host details. + if not (host_details := get_item(self.instance_commands[0].json_output["hosts"], "hostname", hostname)): + failures += f"Details not found for SNMP host '{hostname}'.\n" + continue + + # Update expected host details. + expected_host_details = {"vrf": vrf, "notification type": notification_type, "udp port": udp_port} + + # Update actual host details. + actual_host_details = {"notification type": host_details.get("notificationType", "Not Found"), "udp port": host_details.get("port", "Not Found")} + + # Verify SNMP protocol version. + if version in ["v1", "v2c"]: + expected_host_details["community_string"] = community_string + actual_host_details["community_string"] = host_details.get("v1v2cParams", {}).get("communityString", "Not Found") + + if version == "v3": + expected_host_details["user"] = user + actual_host_details["user"] = host_details.get("v3Params", {}).get("user", "Not Found") + + # Verify the VRF for SNMP Hosts. If vrf is default then command output consists empty string. + actual_host_details["vrf"] = "default" if (vrf_name := host_details.get("vrf")) == "" else "Not Found" if vrf_name is None else vrf_name + + # Collecting failures logs if any. + failure_logs = get_failed_logs(expected_host_details, actual_host_details) + if failure_logs: + failures += f"For SNMP host {hostname}:{failure_logs}\n" + + # Check if there are any failures. + if not failures: + self.result.is_success() + else: + self.result.is_failure(failures) diff --git a/examples/tests.yaml b/examples/tests.yaml index ade4e7640..833d6615a 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -398,6 +398,20 @@ anta.tests.snmp: location: New York - VerifySnmpContact: contact: Jon@example.com + - VerifySNMPNotificationHost: + notification_hosts: + - hostname: 192.168.1.100 + vrf: default + notification_type: trap + version: v3 + udp_port: 162 + user: public + - hostname: 192.168.1.101 + vrf: default + notification_type: trap + version: v2c + udp_port: 162 + community_string: public anta.tests.software: - VerifyEOSVersion: diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index f6d964f83..b10db5bf5 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -7,7 +7,7 @@ from typing import Any -from anta.tests.snmp import VerifySnmpContact, VerifySnmpIPv4Acl, VerifySnmpIPv6Acl, VerifySnmpLocation, VerifySnmpStatus +from anta.tests.snmp import VerifySnmpContact, VerifySnmpIPv4Acl, VerifySnmpIPv6Acl, VerifySnmpLocation, VerifySNMPNotificationHost, VerifySnmpStatus from tests.units.anta_tests import test DATA: list[dict[str, Any]] = [ @@ -152,4 +152,95 @@ "messages": ["SNMP contact is not configured."], }, }, + { + "name": "success", + "test": VerifySNMPNotificationHost, + "eos_data": [ + { + "hosts": [ + { + "hostname": "192.168.1.100", + "port": 162, + "vrf": "", + "notificationType": "trap", + "protocolVersion": "v3", + "v3Params": {"user": "public", "securityLevel": "authNoPriv"}, + }, + { + "hostname": "192.168.1.101", + "port": 162, + "vrf": "", + "notificationType": "trap", + "protocolVersion": "v2c", + "v1v2cParams": {"communityString": "public"}, + }, + ] + } + ], + "inputs": { + "notification_hosts": [ + {"hostname": "192.168.1.100", "vrf": "default", "notification_type": "trap", "version": "v3", "udp_port": 162, "user": "public"}, + {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-details-not-found", + "test": VerifySNMPNotificationHost, + "eos_data": [{"hosts": []}], + "inputs": { + "notification_hosts": [ + {"hostname": "192.168.1.100", "vrf": "default", "notification_type": "trap", "version": "v3", "udp_port": 162, "user": "public"}, + {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, + ] + }, + "expected": {"result": "failure", "messages": ["Details not found for SNMP host '192.168.1.100'.\nDetails not found for SNMP host '192.168.1.101'.\n"]}, + }, + { + "name": "failure-incorrect-config", + "test": VerifySNMPNotificationHost, + "eos_data": [ + { + "hosts": [ + { + "hostname": "192.168.1.100", + "port": 163, + "vrf": "", + "notificationType": "inform", + "protocolVersion": "v3", + "v3Params": {"user": "public1", "securityLevel": "authNoPriv"}, + }, + { + "hostname": "192.168.1.101", + "port": 163, + "vrf": "MGMT", + "notificationType": "inform", + "protocolVersion": "v2c", + "v1v2cParams": {"communityString": "public1"}, + }, + ] + } + ], + "inputs": { + "notification_hosts": [ + {"hostname": "192.168.1.100", "vrf": "default", "notification_type": "trap", "version": "v3", "udp_port": 162, "user": "public"}, + {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "For SNMP host 192.168.1.100:\n" + "Expected `trap` as the notification type, but found `inform` instead.\n" + "Expected `162` as the udp port, but found `163` instead.\n" + "Expected `public` as the user, but found `public1` instead.\n" + "For SNMP host 192.168.1.101:\n" + "Expected `default` as the vrf, but found `MGMT` instead.\n" + "Expected `trap` as the notification type, but found `inform` instead.\n" + "Expected `162` as the udp port, but found `163` instead.\n" + "Expected `public` as the community_string, but found `public1` instead.\n" + ], + }, + }, ] From 21a3786c0393bde0d0165985860fc2d32d4deb9c Mon Sep 17 00:00:00 2001 From: VitthalMagadum Date: Fri, 27 Sep 2024 02:16:27 -0400 Subject: [PATCH 2/4] issue_822 Added fix to compensate sonarlint issue --- anta/tests/snmp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index 626b4db22..e2e09efe4 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -346,7 +346,7 @@ def test(self) -> None: actual_host_details["user"] = host_details.get("v3Params", {}).get("user", "Not Found") # Verify the VRF for SNMP Hosts. If vrf is default then command output consists empty string. - actual_host_details["vrf"] = "default" if (vrf_name := host_details.get("vrf")) == "" else "Not Found" if vrf_name is None else vrf_name + actual_host_details["vrf"] = "default" if (vrf_name := host_details.get("vrf", "Not Found")) == "" else vrf_name # Collecting failures logs if any. failure_logs = get_failed_logs(expected_host_details, actual_host_details) From 4261605cf9229fd1ef400ed5e8a06f8d003e9f60 Mon Sep 17 00:00:00 2001 From: VitthalMagadum Date: Mon, 30 Sep 2024 07:26:23 -0400 Subject: [PATCH 3/4] issue_822 Handling review comments: updated failure msgs and docstring --- anta/tests/snmp.py | 24 ++++++++++++++++-------- tests/units/anta_tests/test_snmp.py | 29 +++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index e2e09efe4..9d8b2dfa6 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -245,10 +245,14 @@ def test(self) -> None: class VerifySNMPNotificationHost(AntaTest): """Verifies the SNMP notification host (SNMP manager) configurations. + - Verifies that the valid notification type and VRF name. + - Ensures that UDP port provided matches the expected value. + - Ensures that the community_string is properly set for SNMP v1/v2 and for SNMP v3, the user field is included, aligning with version-specific requirements. + Expected Results ---------------- - * Success: The test will pass if the SNMP PDU counter(s) are non-zero/greater than zero. - * Failure: The test will fail if the SNMP PDU counter(s) are zero/None/Not Found. + * Success: The test will pass if the provided SNMP notification host and all specified parameters are correctly configured. + * Failure: The test will fail if the provided SNMP notification host is not configured or specified parameters are not correctly configured. Examples -------- @@ -281,7 +285,7 @@ class SNMPHost(BaseModel): """Model for a SNMP Manager.""" hostname: IPv4Address - """IP address of the SNMP notification host.""" + """IPv4 address of the SNMP notification host.""" vrf: str = "default" """Optional VRF for SNMP Hosts. If not provided, it defaults to `default`.""" notification_type: Literal["trap", "inform"] @@ -314,8 +318,14 @@ def validate_inputs(self: BaseModel) -> BaseModel: @AntaTest.anta_test def test(self) -> None: """Main test function for VerifySNMPNotificationHost.""" + self.result.is_success() failures: str = "" + # Verify SNMP host details. + if not (snmp_hosts := get_value(self.instance_commands[0].json_output, "hosts")): + self.result.is_failure("SNMP is not configured.") + return + for host in self.inputs.notification_hosts: hostname = str(host.hostname) vrf = host.vrf @@ -326,8 +336,8 @@ def test(self) -> None: user = host.user # Verify SNMP host details. - if not (host_details := get_item(self.instance_commands[0].json_output["hosts"], "hostname", hostname)): - failures += f"Details not found for SNMP host '{hostname}'.\n" + if not (host_details := get_item(snmp_hosts, "hostname", hostname)): + failures += f"SNMP host '{hostname}' is not configured.\n" continue # Update expected host details. @@ -354,7 +364,5 @@ def test(self) -> None: failures += f"For SNMP host {hostname}:{failure_logs}\n" # Check if there are any failures. - if not failures: - self.result.is_success() - else: + if failures: self.result.is_failure(failures) diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index b10db5bf5..3177aceb1 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -186,7 +186,7 @@ "expected": {"result": "success"}, }, { - "name": "failure-details-not-found", + "name": "failure-not-configured", "test": VerifySNMPNotificationHost, "eos_data": [{"hosts": []}], "inputs": { @@ -195,7 +195,32 @@ {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, ] }, - "expected": {"result": "failure", "messages": ["Details not found for SNMP host '192.168.1.100'.\nDetails not found for SNMP host '192.168.1.101'.\n"]}, + "expected": {"result": "failure", "messages": ["SNMP is not configured."]}, + }, + { + "name": "failure-details-not-found", + "test": VerifySNMPNotificationHost, + "eos_data": [ + { + "hosts": [ + { + "hostname": "192.168.1.100", + "port": 162, + "vrf": "", + "notificationType": "trap", + "protocolVersion": "v3", + "v3Params": {"user": "public", "securityLevel": "authNoPriv"}, + }, + ] + } + ], + "inputs": { + "notification_hosts": [ + {"hostname": "192.168.1.100", "vrf": "default", "notification_type": "trap", "version": "v3", "udp_port": 162, "user": "public"}, + {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, + ] + }, + "expected": {"result": "failure", "messages": ["SNMP host '192.168.1.101' is not configured.\n"]}, }, { "name": "failure-incorrect-config", From da8ba279af4d37a57f7e74806d74492fd111c259 Mon Sep 17 00:00:00 2001 From: VitthalMagadum Date: Thu, 3 Oct 2024 06:04:04 -0400 Subject: [PATCH 4/4] issue_822 Handling review comments: updated docstring, custom_types --- anta/custom_types.py | 1 + anta/tests/snmp.py | 8 ++++---- tests/units/anta_tests/test_snmp.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/anta/custom_types.py b/anta/custom_types.py index c1e1f6428..4e8972d70 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -204,3 +204,4 @@ def validate_regex(value: str) -> str: ] BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"] BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"] +SnmpVersion = Literal["v1", "v2c", "v3"] diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index 9d8b2dfa6..66b849ce9 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -12,7 +12,7 @@ from pydantic import BaseModel, model_validator -from anta.custom_types import Port, PositiveInteger +from anta.custom_types import Port, PositiveInteger, SnmpVersion from anta.models import AntaCommand, AntaTest from anta.tools import get_failed_logs, get_item, get_value @@ -282,7 +282,7 @@ class Input(AntaTest.Input): """List of SNMP hosts.""" class SNMPHost(BaseModel): - """Model for a SNMP Manager.""" + """Model for a SNMP Host.""" hostname: IPv4Address """IPv4 address of the SNMP notification host.""" @@ -290,7 +290,7 @@ class SNMPHost(BaseModel): """Optional VRF for SNMP Hosts. If not provided, it defaults to `default`.""" notification_type: Literal["trap", "inform"] """Type of SNMP notification (trap or inform).""" - version: Literal["v1", "v2c", "v3"] + version: SnmpVersion """SNMP protocol version.""" udp_port: Port | int = 162 """UDP port for SNMP. If not provided then defaults to 162.""" @@ -323,7 +323,7 @@ def test(self) -> None: # Verify SNMP host details. if not (snmp_hosts := get_value(self.instance_commands[0].json_output, "hosts")): - self.result.is_failure("SNMP is not configured.") + self.result.is_failure("No SNMP host is configured.") return for host in self.inputs.notification_hosts: diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index 3177aceb1..c2b5a1234 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -195,7 +195,7 @@ {"hostname": "192.168.1.101", "vrf": "default", "notification_type": "trap", "version": "v2c", "udp_port": 162, "community_string": "public"}, ] }, - "expected": {"result": "failure", "messages": ["SNMP is not configured."]}, + "expected": {"result": "failure", "messages": ["No SNMP host is configured."]}, }, { "name": "failure-details-not-found",