diff --git a/tests/common/helpers/snmp_helpers.py b/tests/common/helpers/snmp_helpers.py index c66f7a04fd..907cec06d9 100644 --- a/tests/common/helpers/snmp_helpers.py +++ b/tests/common/helpers/snmp_helpers.py @@ -1,8 +1,10 @@ import logging +import ipaddress from tests.common.utilities import wait_until from tests.common.errors import RunAnsibleModuleFail from tests.common.helpers.assertions import pytest_assert +from tests.common.devices.eos import EosHost logger = logging.getLogger(__name__) @@ -42,3 +44,46 @@ def get_snmp_facts(localhost, host, version, community, is_dell=False, module_ig pytest_assert(wait_until(timeout, interval, 0, _update_snmp_facts, localhost, host, version, community, is_dell, include_swap), "Timeout waiting for SNMP facts") return global_snmp_facts + + +def get_snmp_output(ip, duthost, nbr, creds_all_duts, oid='.1.3.6.1.2.1.1.1.0'): + """ + Get snmp output from duthost using specific ip to query + snmp query is sent from neighboring ceos/vsonic + + Args: + ip(str): IP of dut to be used to send SNMP query + duthost: duthost + nbr: from where the snmp query should be executed + creds_all_duts: creds to get snmp_rocommunity of duthost + oid: to query + + Returns: + SNMP result + """ + ipaddr = ipaddress.ip_address(ip) + iptables_cmd = "iptables" + + # TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule. + if isinstance(ipaddr, ipaddress.IPv6Address): + iptables_cmd = "ip6tables" + return None + + ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format( + iptables_cmd, ip) + duthost.shell(ip_tbl_rule_add) + + if isinstance(nbr["host"], EosHost): + eos_snmpget = "bash snmpget -v2c -c {} {} {}".format( + creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid) + out = nbr['host'].eos_command(commands=[eos_snmpget]) + else: + command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format( + creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid) + out = nbr['host'].command(command) + + ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format( + iptables_cmd, ip) + duthost.shell(ip_tbl_rule_del) + + return out diff --git a/tests/macsec/test_interop_protocol.py b/tests/macsec/test_interop_protocol.py index 0c70a313ac..b5b7a65896 100644 --- a/tests/macsec/test_interop_protocol.py +++ b/tests/macsec/test_interop_protocol.py @@ -1,12 +1,12 @@ import pytest import logging -import re +import ipaddress from tests.common.utilities import wait_until -from tests.common.devices.eos import EosHost from .macsec_helper import getns_prefix from .macsec_config_helper import disable_macsec_port, enable_macsec_port from .macsec_platform_helper import find_portchannel_from_member, get_portchannel, get_lldp_list, sonic_db_cli +from tests.common.helpers.snmp_helpers import get_snmp_output logger = logging.getLogger(__name__) @@ -119,24 +119,28 @@ def check_bgp_established(ctrl_port, up_link): assert wait_until(BGP_TIMEOUT, BGP_KEEPALIVE, BGP_HOLDTIME, check_bgp_established, ctrl_port, upstream_links[ctrl_port]) - def test_snmp(self, duthost, ctrl_links, upstream_links, creds, wait_mka_establish): + def test_snmp(self, duthost, ctrl_links, upstream_links, creds_all_duts, wait_mka_establish): ''' Verify SNMP request/response works across interface with macsec configuration ''' if duthost.is_multi_asic: pytest.skip("The test is for Single ASIC devices") + loopback0_ips = duthost.config_facts(host=duthost.hostname, + source="running")[ + "ansible_facts"].get( + "LOOPBACK_INTERFACE", + {}).get('Loopback0', {}) + for ip in loopback0_ips: + if isinstance(ipaddress.ip_network(ip), + ipaddress.IPv4Network): + dut_loip = ip.split('/')[0] + break + else: + pytest.fail("No Loopback0 IPv4 address for {}". + format(duthost.hostname)) for ctrl_port, nbr in list(ctrl_links.items()): - if isinstance(nbr["host"], EosHost): - result = nbr["host"].eos_command( - commands=['show snmp community | include name']) - community = re.search(r'Community name: (\S+)', - result['stdout'][0]).groups()[0] - else: # vsonic neighbour - community = creds['snmp_rocommunity'] - - up_link = upstream_links[ctrl_port] sysDescr = ".1.3.6.1.2.1.1.1.0" - command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format( - community, up_link["local_ipv4_addr"], sysDescr) - assert not duthost.command(command)["failed"] + result = get_snmp_output(dut_loip, duthost, nbr, + creds_all_duts, sysDescr) + assert not result["failed"] diff --git a/tests/snmp/test_snmp_loopback.py b/tests/snmp/test_snmp_loopback.py index 5ae1d0e12a..68747d1994 100644 --- a/tests/snmp/test_snmp_loopback.py +++ b/tests/snmp/test_snmp_loopback.py @@ -1,10 +1,7 @@ import pytest import ipaddress -from tests.common.helpers.snmp_helpers import get_snmp_facts -try: # python3 - from shlex import quote -except ImportError: # python2 - from pipes import quote +from tests.common.helpers.snmp_helpers import get_snmp_facts, get_snmp_output +from tests.common.devices.eos import EosHost pytestmark = [ pytest.mark.topology('t0', 't1', 't2', 'm0', 'mx'), @@ -12,30 +9,6 @@ ] -def get_snmp_output(ip, duthost, nbr, creds_all_duts): - ipaddr = ipaddress.ip_address(ip) - iptables_cmd = "iptables" - - # TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule. - if isinstance(ipaddr, ipaddress.IPv6Address): - iptables_cmd = "ip6tables" - return None - - ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format( - iptables_cmd, ip) - duthost.shell(ip_tbl_rule_add) - - eos_snmpget = "bash snmpget -v2c -c {} {} 1.3.6.1.2.1.1.1.0".format( - quote(creds_all_duts[duthost.hostname]['snmp_rocommunity']), ip) - out = nbr['host'].eos_command(commands=[eos_snmpget]) - - ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format( - iptables_cmd, ip) - duthost.shell(ip_tbl_rule_del) - - return out - - @pytest.mark.bsl def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrhosts, tbinfo, localhost, creds_all_duts): @@ -67,7 +40,11 @@ def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, result = get_snmp_output(loip, duthost, nbr, creds_all_duts) assert result is not None, 'No result from snmpget' assert len(result['stdout_lines']) > 0, 'No result from snmpget' - assert "SONiC Software Version" in result['stdout_lines'][0][0],\ + if isinstance(nbr["host"], EosHost): + stdout_lines = result['stdout_lines'][0][0] + else: + stdout_lines = result['stdout_lines'][0] + assert "SONiC Software Version" in stdout_lines,\ "Sysdescr not found in SNMP result from IP {}".format(ip) - assert snmp_facts['ansible_sysdescr'] in result['stdout_lines'][0][ - 0], "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip) + assert snmp_facts['ansible_sysdescr'] in stdout_lines,\ + "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)