Skip to content

Commit

Permalink
traps: allow trap filtering with multiple key values
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen M committed Oct 10, 2024
1 parent 23a019f commit b2d2c3a
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions src/SnmpLibrary/traps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pyasn1.codec.ber import decoder

from . import utils
from robot.api import logger


def _generic_trap_filter(domain, sock, pdu, **kwargs):
Expand All @@ -34,11 +35,18 @@ def _generic_trap_filter(domain, sock, pdu, **kwargs):
if sock[0] != kwargs['host']:
return False

for oid, val in v2c.apiPDU.getVarBindList(pdu):
for oid, val in v2c.apiPDU.getVarBinds(pdu):
if 'oid' in kwargs and kwargs['oid']:
if oid == snmpTrapOID:
if val[0][0][2] != v2c.ObjectIdentifier(kwargs['oid']):
if val != v2c.ObjectIdentifier(kwargs['oid']):
return False

if 'varBinds' in kwargs and kwargs['varBinds']:
for varBind in kwargs['varBinds']:
if oid == utils.parse_oid(varBind[0]):
if val != varBind[1]:
return False

return True


Expand All @@ -55,6 +63,7 @@ def _trap_receiver_cb(transport, domain, sock, msg):
raise RuntimeError('Only SNMP v2c traps are supported.')

req, msg = decoder.decode(msg, asn1Spec=v2c.Message())
logger.info(f"Trap Received: {req.prettyPrint()}")
pdu = v2c.apiMessage.getPDU(req)

# ignore any non trap PDUs
Expand Down Expand Up @@ -85,15 +94,20 @@ class _Traps:
def __init__(self):
self._trap_filters = dict()

def new_trap_filter(self, name, host=None, oid=None):
def new_trap_filter(self, name, host=None, oid=None, varBinds=None):
"""Defines a new SNMP trap filter.
At the moment, you can only filter on the sending host and on the trap
At the moment, you can filter on the sending host and on the trap
OID.
You can also filter on the variable bindings.
Here, varBinds is a list of tuples where each tuple is oid and value pair.
Eg : [(oid1,val1),(oid2,val2)]
"""
trap_filter = functools.partial(_generic_trap_filter,
host=host,
oid=utils.parse_oid(oid))
oid=utils.parse_oid(oid),
varBinds=varBinds)
self._trap_filters[name] = trap_filter

def wait_until_trap_is_received(self, trap_filter_name, timeout=5.0,
Expand Down

0 comments on commit b2d2c3a

Please sign in to comment.