-
Notifications
You must be signed in to change notification settings - Fork 39
/
netsnmpvartypes.py
280 lines (230 loc) · 8.08 KB
/
netsnmpvartypes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#
# python-netsnmpagent module
# Copyright (c) 2013-2019 Pieter Hollants <[email protected]>
# Licensed under the GNU Lesser Public License (LGPL) version 3
#
# SNMP scalar variable types
#
import ctypes, socket, struct
from netsnmpapi import *
# Maximum string size supported by python-netsnmpagent
MAX_STRING_SIZE = 1024
# Helper function to determine if "x" is a number
def isnum(x):
try:
x + 1
return True
except TypeError:
return False
# Base class for scalar SNMP variables.
# This class is not supposed to be instantiated directly.
class _VarType(object):
def value(self):
val = self._cvar.value
if isnum(val):
if not isinstance(val, float):
# Python 2.x will automatically switch from the "int"
# type to the "long" type, if necessary. Python 3.x
# has no limits on the "int" type anymore.
val = int(val)
else:
val = u(val)
return val
# Intermediate class for scalar SNMP variables of fixed size.
# This class is not supposed to be instantiated directly.
class _FixedSizeVarType(_VarType):
def __init__(self, initval):
# Create the ctypes class instance representing the variable
# for handling by the net-snmp C API. self._ctype is supposed
# to have been set by an inheriting class.
self._cvar = self._ctype(initval if isnum(initval) else b(initval))
self._data_size = ctypes.sizeof(self._cvar)
self._max_size = self._data_size
# Flags for the netsnmp_watcher_info structure
self._watcher_flags = WATCHER_FIXED_SIZE
return self
def cref(self, **kwargs):
return ctypes.byref(self._cvar)
def update(self, val):
self._cvar.value = val
class Integer32(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_INTEGER
self._ctype = ctypes.c_long
super(Integer32, self).__init__(initval)
class Unsigned32(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_UNSIGNED
self._ctype = ctypes.c_ulong
super(Unsigned32, self).__init__(initval)
class Counter32(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_COUNTER
self._ctype = ctypes.c_ulong
super(Counter32, self).__init__(initval)
def update(self, val):
# Cut off values larger than 32 bits
if val >> 32:
val = val & 0xFFFFFFFF
super(Counter32, self).update(val)
def increment(self, count=1):
self.update(self.value() + count)
class Counter64(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_COUNTER64
self._ctype = counter64
super(Counter64, self).__init__(initval)
def update(self, val):
# Cut off values larger than 64 bits
if val >> 64:
val = val & 0xFFFFFFFFFFFFFFFF
super(Counter64, self).update(val)
def increment(self, count=1):
self.update(self.value() + count)
class Gauge32(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_GAUGE
self._ctype = ctypes.c_ulong
super(Gauge32, self).__init__(initval)
def update(self, val):
# Restrict values larger than 32 bits to ULONG_MAX
if val >> 32:
val = 0xFFFFFFFF
super(Gauge32, self).update(val)
def increment(self, count=1):
self.update(self.value() + count)
class TimeTicks(_FixedSizeVarType):
def __init__(self, initval = 0):
self._asntype = ASN_TIMETICKS
self._ctype = ctypes.c_ulong
super(TimeTicks, self).__init__(initval)
# RFC 2579 TruthValues should offer a bool interface to Python but
# are stored as Integers using the special constants TV_TRUE and TV_FALSE
class TruthValue(_FixedSizeVarType):
def __init__(self, initval = False):
self._asntype = ASN_INTEGER
self._ctype = ctypes.c_int
super(TruthValue, self).__init__(TV_TRUE if initval else TV_FALSE)
def value(self):
return True if self._cvar.value == TV_TRUE else False
def update(self, val):
if isinstance(val, bool):
self._cvar.value = TV_TRUE if val == True else TV_FALSE
else:
raise netsnmpAgentException("TruthValue must be True or False")
class Float(_FixedSizeVarType):
def __init__(self, initval = 0.0):
self._asntype = ASN_OPAQUE_FLOAT
self._ctype = ctypes.c_float
super(Float, self).__init__(initval)
# IP v4 addresses are stored as unsigned integers but we want the Python
# interface to use strings.
class IpAddress(_FixedSizeVarType):
def __init__(self, initval = "0.0.0.0"):
self._asntype = ASN_IPADDRESS
self._ctype = ctypes.c_uint
super(IpAddress, self).__init__(0)
self.update(initval)
def value(self):
# Get string representation of IP address.
return socket.inet_ntoa(
struct.pack("I", self._cvar.value)
)
def cref(self, **kwargs):
# Due to an unfixed Net-SNMP issue (see
# https://sourceforge.net/p/net-snmp/bugs/2136/) we have
# to convert the value to host byte order if it shall be
# used as table index.
if kwargs.get("is_table_index", False) == False:
return ctypes.byref(self._cvar)
else:
_cidx = ctypes.c_uint(0)
_cidx.value = struct.unpack("I", struct.pack("!I", self._cvar.value))[0]
return ctypes.byref(_cidx)
def update(self, val):
# Convert dotted decimal IP address string to ctypes
# unsigned int in network byte order.
self._cvar.value = struct.unpack(
"I",
socket.inet_aton(val if val else "0.0.0.0")
)[0]
# Intermediate class for scalar SNMP variables of variable size.
# This class is not supposed to be instantiated directly.
class _MaxSizeVarType(_VarType):
def __init__(self, initval, max_size):
# Create the ctypes class instance representing the variable
# for handling by the net-snmp C API. self._ctype is supposed
# to have been set by an inheriting class. Since it is assumed to
# have no fixed size, we pass the maximum size as second
# argument to the constructor.
self._cvar = self._ctype(initval if isnum(initval) else b(initval), max_size)
self._data_size = len(self._cvar.value)
self._max_size = max(self._data_size, max_size)
# Flags for the netsnmp_watcher_info structure
self._watcher_flags = WATCHER_MAX_SIZE
return self
def cref(self, **kwargs):
return self._cvar
def update(self, val):
self._cvar.value = val
if len(val) > self._max_size:
raise netsnmpAgentException(
"Value passed to update() truncated: {0} > {1} "
"bytes!".format(len(val), self._max_size)
)
self._data_size = self._watcher.contents.data_size = len(val)
class _String(_MaxSizeVarType):
def __init__(self, initval = ""):
self._asntype = ASN_OCTET_STR
# Note we can't use ctypes.c_char_p here since that creates an immutable
# type and net-snmp _can_ modify the buffer (unless writable is False).
self._ctype = ctypes.create_string_buffer
# Also note that while net-snmp 5.5 introduced a WATCHER_SIZE_STRLEN flag,
# we have to stick to WATCHER_MAX_SIZE for now to support net-snmp 5.4.x
# (used eg. in SLES 11 SP2 and Ubuntu 12.04 LTS).
self._flags = WATCHER_MAX_SIZE
super(_String, self).__init__(initval, MAX_STRING_SIZE)
# Whereas an OctetString can contain all byte values, a DisplayString is
# restricted to ASCII characters only.
class OctetString(_String):
def __init__(self, initval = ""):
super(OctetString, self).__init__(initval)
self._data_size = len(b(initval))
def value(self):
val = self._cvar.raw
if hasattr(self, "_watcher"):
size = self._watcher.contents.data_size
else:
size = self._data_size
return b(val[:size])
class DisplayString(_String):
pass
class Bits(OctetString):
# RFC 2578 - 7.1.4 The BITS construct
def __init__(self, initval=None):
super().__init__(Bits._to_bytes(initval))
@staticmethod
def _reverse(data):
"""
Reverse bits in byte
"""
return (((data * 0x0802 & 0x22110) | (data * 0x8020 & 0x88440)) * 0x10101 >> 16) & 0xff
@staticmethod
def _to_bytes(value):
"""
Convert integer value to bytes according to SNMP BITS format
"""
if value is None:
return b''
num_bytes = ((value.bit_length() + 7) // 8)
return bytes(Bits._reverse(byte) for byte in value.to_bytes(num_bytes, byteorder='little'))
@staticmethod
def _from_bytes(data):
"""
Convert SNMP BITS data to integer
"""
return int.from_bytes(bytes(Bits._reverse(byte) for byte in data), byteorder='little')
def update(self, val):
super().update(Bits._to_bytes(val))
def value(self):
return Bits._from_bytes(super().value())