Skip to content

Commit

Permalink
[202205]Use device loopback IP address to send SNMP query from neighb…
Browse files Browse the repository at this point in the history
…oring ceos or vsonic (#8802) (#8972)

What is the motivation for this PR?
cherry-pick of #8802
sonic-net/sonic-buildimage#15487 modifies snmpd in SONiC to listen on management and loopback ip by default instead of listening on any IP. test_interop_protocol.py::test_snmp sends a query to the neighbor device using the link ip address. After the above change to listen on management and loopback ip, the snmp query will fail. Hence, modifying the test to send SNMP query to SONiC DUT from neighboring vsonic/eos using Loopback IP of SONiC DUT.

How did you do it?
Get loopback IPv4 address DUT and use that to query from neighbor device. Added a generic helped function to send SNMP query to DUT from a neighbor.

(cherry picked from commit d66965a)

How did you verify/test it?
Tested on single asic VS testbed.

Signed-off-by: Suvarna Meenakshi <[email protected]>
  • Loading branch information
SuvarnaMeenakshi authored Jul 14, 2023
1 parent d440721 commit 712f338
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 45 deletions.
45 changes: 45 additions & 0 deletions tests/common/helpers/snmp_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import ipaddress

from tests.common.utilities import wait_until
from tests.common.errors import RunAnsibleModuleFail
from tests.common.helpers.assertions import pytest_assert
from tests.common.devices.eos import EosHost

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,3 +42,46 @@ def get_snmp_facts(localhost, host, version, community, is_dell=False, module_ig
pytest_assert(wait_until(timeout, interval, 0, _update_snmp_facts, localhost, host, version,
community, is_dell, include_swap), "Timeout waiting for SNMP facts")
return global_snmp_facts


def get_snmp_output(ip, duthost, nbr, creds_all_duts, oid='.1.3.6.1.2.1.1.1.0'):
"""
Get snmp output from duthost using specific ip to query
snmp query is sent from neighboring ceos/vsonic
Args:
ip(str): IP of dut to be used to send SNMP query
duthost: duthost
nbr: from where the snmp query should be executed
creds_all_duts: creds to get snmp_rocommunity of duthost
oid: to query
Returns:
SNMP result
"""
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

if isinstance(nbr["host"], EosHost):
eos_snmpget = "bash snmpget -v2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].eos_command(commands=[eos_snmpget])
else:
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].command(command)

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out
36 changes: 20 additions & 16 deletions tests/macsec/test_interop_protocol.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from time import sleep
import pytest
import logging
import re
import scapy.all as scapy
import ptf.testutils as testutils
from collections import Counter
import ipaddress

from tests.common.utilities import wait_until
from tests.common.devices.eos import EosHost
from tests.common import config_reload
from macsec_helper import *
from macsec_config_helper import *
from macsec_platform_helper import *
from tests.common.helpers.snmp_helpers import get_snmp_output

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,24 +121,28 @@ def check_bgp_established(ctrl_port, up_link):
assert wait_until(BGP_TIMEOUT, BGP_KEEPALIVE, BGP_HOLDTIME,
check_bgp_established, ctrl_port, upstream_links[ctrl_port])

def test_snmp(self, duthost, ctrl_links, upstream_links, creds, wait_mka_establish):
def test_snmp(self, duthost, ctrl_links, upstream_links, creds_all_duts, wait_mka_establish):
'''
Verify SNMP request/response works across interface with macsec configuration
'''
if duthost.is_multi_asic:
pytest.skip("The test is for Single ASIC devices")

for ctrl_port, nbr in ctrl_links.items():
if isinstance(nbr["host"], EosHost):
result = nbr["host"].eos_command(
commands=['show snmp community | include name'])
community = re.search(r'Community name: (\S+)',
result['stdout'][0]).groups()[0]
else: # vsonic neighbour
community = creds['snmp_rocommunity']

up_link = upstream_links[ctrl_port]
loopback0_ips = duthost.config_facts(host=duthost.hostname,
source="running")[
"ansible_facts"].get(
"LOOPBACK_INTERFACE",
{}).get('Loopback0', {})
for ip in loopback0_ips:
if isinstance(ipaddress.ip_network(ip),
ipaddress.IPv4Network):
dut_loip = ip.split('/')[0]
break
else:
pytest.fail("No Loopback0 IPv4 address for {}".
format(duthost.hostname))
for ctrl_port, nbr in list(ctrl_links.items()):
sysDescr = ".1.3.6.1.2.1.1.1.0"
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
community, up_link["local_ipv4_addr"], sysDescr)
assert not duthost.command(command)["failed"]
result = get_snmp_output(dut_loip, duthost, nbr,
creds_all_duts, sysDescr)
assert not result["failed"]
40 changes: 11 additions & 29 deletions tests/snmp/test_snmp_loopback.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,14 @@
import pytest
import ipaddress
from tests.common.helpers.snmp_helpers import get_snmp_facts
try: # python3
from shlex import quote
except ImportError: # python2
from pipes import quote
from tests.common.helpers.snmp_helpers import get_snmp_facts, get_snmp_output
from tests.common.devices.eos import EosHost

pytestmark = [
pytest.mark.topology('any'),
pytest.mark.device_type('vs')
]


def get_snmp_output(ip, duthost, nbr, creds_all_duts):
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

eos_snmpget = "bash snmpget -v2c -c {} {} 1.3.6.1.2.1.1.1.0".format(quote(creds_all_duts[duthost.hostname]['snmp_rocommunity']), ip)
out = nbr['host'].eos_command(commands=[eos_snmpget])

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out


@pytest.mark.bsl
def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrhosts, tbinfo, localhost, creds_all_duts):
"""
Expand All @@ -58,6 +34,12 @@ def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrh
continue
result = get_snmp_output(loip, duthost, nbr, creds_all_duts)
assert result is not None, 'No result from snmpget'
assert len(result[u'stdout_lines']) > 0, 'No result from snmpget'
assert "SONiC Software Version" in result[u'stdout_lines'][0][0], "Sysdescr not found in SNMP result from IP {}".format(ip)
assert snmp_facts['ansible_sysdescr'] in result[u'stdout_lines'][0][0], "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)
assert len(result['stdout_lines']) > 0, 'No result from snmpget'
if isinstance(nbr["host"], EosHost):
stdout_lines = result['stdout_lines'][0][0]
else:
stdout_lines = result['stdout_lines'][0]
assert "SONiC Software Version" in stdout_lines,\
"Sysdescr not found in SNMP result from IP {}".format(ip)
assert snmp_facts['ansible_sysdescr'] in stdout_lines,\
"Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)

0 comments on commit 712f338

Please sign in to comment.