From 0f74a59ae5752a202e5f2fabbf0504efcdede877 Mon Sep 17 00:00:00 2001 From: VitthalMagadum Date: Tue, 13 Aug 2024 09:42:16 -0400 Subject: [PATCH] issue_781 handling review comments: added helper function for capabilities --- anta/tests/routing/bgp.py | 75 +++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index aaf711676..cc6a70488 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -144,6 +144,50 @@ def _add_bgp_routes_failure( return failure_routes +def _check_peer_cpabilities(bgp_output: dict[str, Any], capabilities: list[MultiProtocolCaps], *, strict: bool = False) -> dict[str, Any]: + """Check for issues in BGP output for multiprotocol capabilities. + + - Checks the only mentioned capabilities should be listed when strict is True. + - Checks the any capability(s) are missing from output. + - Checks the any capability(s) are not advertised, received, or enabled + + Parameters + ---------- + bgp_output: The BGP output from the device. + capabilities: List of multiprotocol capabilities to be verified. + strict: Optional check to verifies the only mentioned capabilities should be listed, otherwise test should fail. It defaults to false + + The * indicates that all arguments following it must be provided as keyword arguments. To compensate boolean-type-hint-positional-argument (FBT001). + + Returns + ------- + dict[str, Any]: A dictionary containing the failure message as per strict check. + + """ + # Prepare the failure dictionary + failure: dict[str, Any] = {} + + # Verifies the only mentioned capabilities should be listed when strict is True. + if strict and not set(bgp_output).issubset(set(capabilities)): + other_capabilities = ", ".join(list(set(bgp_output) - set(capabilities))) + failure["status"] = f"Other than mentioned capabilities following capability(s) are listed: {other_capabilities}" + return failure + + # Check each capability + for capability in capabilities: + capability_output = bgp_output.get(capability) + + # Check if capabilities are missing + if not capability_output: + failure[capability] = "not found" + + # Check if capabilities are not advertised, received, or enabled + elif not all(capability_output.get(prop, False) for prop in ["advertised", "received", "enabled"]): + failure[capability] = capability_output + + return failure + + class VerifyBGPPeerCount(AntaTest): """Verifies the count of BGP peers for a given address family. @@ -726,7 +770,7 @@ class BgpPeer(BaseModel): vrf: str = "default" """Optional VRF for BGP peer. If not provided, it defaults to `default`.""" strict: bool = False - """Optional check for BGP peer. Verifies the only mentioned capabilities should be listed, otherwise test should fail.""" + """Optional check to verifies the only mentioned capabilities should be listed, otherwise test should fail. It defaults to false.""" capabilities: list[MultiProtocolCaps] """List of multiprotocol capabilities to be verified.""" @@ -735,14 +779,14 @@ def test(self) -> None: """Main test function for VerifyBGPPeerMPCaps.""" failures: dict[str, Any] = {"bgp_peers": {}} - # Iterate over each bgp peer + # Iterate over each bgp peer. for bgp_peer in self.inputs.bgp_peers: peer = str(bgp_peer.peer_address) vrf = bgp_peer.vrf capabilities = bgp_peer.capabilities failure: dict[str, dict[str, dict[str, Any]]] = {"bgp_peers": {peer: {vrf: {}}}} - # Check if BGP output exists + # Check if BGP output exists. if ( not (bgp_output := get_value(self.instance_commands[0].json_output, f"vrfs.{vrf}.peerList")) or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None @@ -751,29 +795,14 @@ def test(self) -> None: failures = deep_update(failures, failure) continue - # Fetching the capabilities output + # Fetching the capabilities output. bgp_output = get_value(bgp_output, "neighborCapabilities.multiprotocolCaps") - # Verifies the only mentioned capabilities should be listed in case needed. - if bgp_peer.strict and not set(bgp_output).issubset(set(capabilities)): - other_capabilities = ", ".join(list(set(bgp_output) - set(capabilities))) - failure["bgp_peers"][peer][vrf] = {"status": f"Other than mentioned capabilities following capability(s) are listed: {other_capabilities}"} + # Collecting the failure logs if any. + failure_log = _check_peer_cpabilities(bgp_output, capabilities, strict=bgp_peer.strict) + if failure_log: + failure["bgp_peers"][peer][vrf] = failure_log failures = deep_update(failures, failure) - continue - - # Check each capability - for capability in capabilities: - capability_output = bgp_output.get(capability) - - # Check if capabilities are missing - if not capability_output: - failure["bgp_peers"][peer][vrf][capability] = "not found" - failures = deep_update(failures, failure) - - # Check if capabilities are not advertised, received, or enabled - elif not all(capability_output.get(prop, False) for prop in ["advertised", "received", "enabled"]): - failure["bgp_peers"][peer][vrf][capability] = capability_output - failures = deep_update(failures, failure) # Check if there are any failures if not failures["bgp_peers"]: