From 712f3380b3546c48c2fe8300cf7ff7ca0d601a0f Mon Sep 17 00:00:00 2001 From: SuvarnaMeenakshi <50386592+SuvarnaMeenakshi@users.noreply.github.com> Date: Thu, 13 Jul 2023 19:03:47 -0700 Subject: [PATCH] [202205]Use device loopback IP address to send SNMP query from neighboring ceos or vsonic (#8802) (#8972) What is the motivation for this PR? cherry-pick of #8802 sonic-net/sonic-buildimage#15487 modifies snmpd in SONiC to listen on management and loopback ip by default instead of listening on any IP. test_interop_protocol.py::test_snmp sends a query to the neighbor device using the link ip address. After the above change to listen on management and loopback ip, the snmp query will fail. Hence, modifying the test to send SNMP query to SONiC DUT from neighboring vsonic/eos using Loopback IP of SONiC DUT. How did you do it? Get loopback IPv4 address DUT and use that to query from neighbor device. Added a generic helped function to send SNMP query to DUT from a neighbor. (cherry picked from commit d66965a) How did you verify/test it? Tested on single asic VS testbed. Signed-off-by: Suvarna Meenakshi --- tests/common/helpers/snmp_helpers.py | 45 +++++++++++++++++++++++++++ tests/macsec/test_interop_protocol.py | 36 +++++++++++---------- tests/snmp/test_snmp_loopback.py | 40 +++++++----------------- 3 files changed, 76 insertions(+), 45 deletions(-) diff --git a/tests/common/helpers/snmp_helpers.py b/tests/common/helpers/snmp_helpers.py index 5e92494e4a..77ce7cfee2 100644 --- a/tests/common/helpers/snmp_helpers.py +++ b/tests/common/helpers/snmp_helpers.py @@ -1,8 +1,10 @@ import logging +import ipaddress from tests.common.utilities import wait_until from tests.common.errors import RunAnsibleModuleFail from tests.common.helpers.assertions import pytest_assert +from tests.common.devices.eos import EosHost logger = logging.getLogger(__name__) @@ -40,3 +42,46 @@ def get_snmp_facts(localhost, host, version, community, is_dell=False, module_ig pytest_assert(wait_until(timeout, interval, 0, _update_snmp_facts, localhost, host, version, community, is_dell, include_swap), "Timeout waiting for SNMP facts") return global_snmp_facts + + +def get_snmp_output(ip, duthost, nbr, creds_all_duts, oid='.1.3.6.1.2.1.1.1.0'): + """ + Get snmp output from duthost using specific ip to query + snmp query is sent from neighboring ceos/vsonic + + Args: + ip(str): IP of dut to be used to send SNMP query + duthost: duthost + nbr: from where the snmp query should be executed + creds_all_duts: creds to get snmp_rocommunity of duthost + oid: to query + + Returns: + SNMP result + """ + ipaddr = ipaddress.ip_address(ip) + iptables_cmd = "iptables" + + # TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule. + if isinstance(ipaddr, ipaddress.IPv6Address): + iptables_cmd = "ip6tables" + return None + + ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format( + iptables_cmd, ip) + duthost.shell(ip_tbl_rule_add) + + if isinstance(nbr["host"], EosHost): + eos_snmpget = "bash snmpget -v2c -c {} {} {}".format( + creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid) + out = nbr['host'].eos_command(commands=[eos_snmpget]) + else: + command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format( + creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid) + out = nbr['host'].command(command) + + ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format( + iptables_cmd, ip) + duthost.shell(ip_tbl_rule_del) + + return out diff --git a/tests/macsec/test_interop_protocol.py b/tests/macsec/test_interop_protocol.py index 15937e28ae..0509ff2cb8 100644 --- a/tests/macsec/test_interop_protocol.py +++ b/tests/macsec/test_interop_protocol.py @@ -1,17 +1,17 @@ from time import sleep import pytest import logging -import re import scapy.all as scapy import ptf.testutils as testutils from collections import Counter +import ipaddress from tests.common.utilities import wait_until -from tests.common.devices.eos import EosHost from tests.common import config_reload from macsec_helper import * from macsec_config_helper import * from macsec_platform_helper import * +from tests.common.helpers.snmp_helpers import get_snmp_output logger = logging.getLogger(__name__) @@ -121,24 +121,28 @@ def check_bgp_established(ctrl_port, up_link): assert wait_until(BGP_TIMEOUT, BGP_KEEPALIVE, BGP_HOLDTIME, check_bgp_established, ctrl_port, upstream_links[ctrl_port]) - def test_snmp(self, duthost, ctrl_links, upstream_links, creds, wait_mka_establish): + def test_snmp(self, duthost, ctrl_links, upstream_links, creds_all_duts, wait_mka_establish): ''' Verify SNMP request/response works across interface with macsec configuration ''' if duthost.is_multi_asic: pytest.skip("The test is for Single ASIC devices") - for ctrl_port, nbr in ctrl_links.items(): - if isinstance(nbr["host"], EosHost): - result = nbr["host"].eos_command( - commands=['show snmp community | include name']) - community = re.search(r'Community name: (\S+)', - result['stdout'][0]).groups()[0] - else: # vsonic neighbour - community = creds['snmp_rocommunity'] - - up_link = upstream_links[ctrl_port] + loopback0_ips = duthost.config_facts(host=duthost.hostname, + source="running")[ + "ansible_facts"].get( + "LOOPBACK_INTERFACE", + {}).get('Loopback0', {}) + for ip in loopback0_ips: + if isinstance(ipaddress.ip_network(ip), + ipaddress.IPv4Network): + dut_loip = ip.split('/')[0] + break + else: + pytest.fail("No Loopback0 IPv4 address for {}". + format(duthost.hostname)) + for ctrl_port, nbr in list(ctrl_links.items()): sysDescr = ".1.3.6.1.2.1.1.1.0" - command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format( - community, up_link["local_ipv4_addr"], sysDescr) - assert not duthost.command(command)["failed"] + result = get_snmp_output(dut_loip, duthost, nbr, + creds_all_duts, sysDescr) + assert not result["failed"] diff --git a/tests/snmp/test_snmp_loopback.py b/tests/snmp/test_snmp_loopback.py index 4176bd7e41..8498f62be9 100644 --- a/tests/snmp/test_snmp_loopback.py +++ b/tests/snmp/test_snmp_loopback.py @@ -1,10 +1,7 @@ import pytest import ipaddress -from tests.common.helpers.snmp_helpers import get_snmp_facts -try: # python3 - from shlex import quote -except ImportError: # python2 - from pipes import quote +from tests.common.helpers.snmp_helpers import get_snmp_facts, get_snmp_output +from tests.common.devices.eos import EosHost pytestmark = [ pytest.mark.topology('any'), @@ -12,27 +9,6 @@ ] -def get_snmp_output(ip, duthost, nbr, creds_all_duts): - ipaddr = ipaddress.ip_address(ip) - iptables_cmd = "iptables" - - # TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule. - if isinstance(ipaddr, ipaddress.IPv6Address): - iptables_cmd = "ip6tables" - return None - - ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip) - duthost.shell(ip_tbl_rule_add) - - eos_snmpget = "bash snmpget -v2c -c {} {} 1.3.6.1.2.1.1.1.0".format(quote(creds_all_duts[duthost.hostname]['snmp_rocommunity']), ip) - out = nbr['host'].eos_command(commands=[eos_snmpget]) - - ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip) - duthost.shell(ip_tbl_rule_del) - - return out - - @pytest.mark.bsl def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrhosts, tbinfo, localhost, creds_all_duts): """ @@ -58,6 +34,12 @@ def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrh continue result = get_snmp_output(loip, duthost, nbr, creds_all_duts) assert result is not None, 'No result from snmpget' - assert len(result[u'stdout_lines']) > 0, 'No result from snmpget' - assert "SONiC Software Version" in result[u'stdout_lines'][0][0], "Sysdescr not found in SNMP result from IP {}".format(ip) - assert snmp_facts['ansible_sysdescr'] in result[u'stdout_lines'][0][0], "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip) + assert len(result['stdout_lines']) > 0, 'No result from snmpget' + if isinstance(nbr["host"], EosHost): + stdout_lines = result['stdout_lines'][0][0] + else: + stdout_lines = result['stdout_lines'][0] + assert "SONiC Software Version" in stdout_lines,\ + "Sysdescr not found in SNMP result from IP {}".format(ip) + assert snmp_facts['ansible_sysdescr'] in stdout_lines,\ + "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)