forked from kytos-ng/flow_stats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
111 lines (87 loc) · 3.53 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Utility classes and definitions."""
import struct
from napps.amlight.sdntrace.shared.singleton import Singleton
class Flows(metaclass=Singleton):
"""Class to store all flows installed in the switches."""
def __init__(self):
"""Institate an empty flow dict."""
self._flows = {}
def clear(self, dpid):
"""Clear the list of flows of the given switch."""
self._flows[dpid] = []
def add_flow(self, dpid, flow):
"""Add a flow to the list of flows of the given switch."""
if dpid not in self._flows:
self._flows[dpid] = []
self._flows[dpid].append(flow)
def get_flows(self, dpid):
"""Return the list of flows of the given switch."""
if dpid in self._flows:
return self._flows[dpid]
return None
def sort(self, dpid):
"""Sort the list of flows of the given switch by priority."""
if dpid in self._flows:
self._flows[dpid].sort(key=lambda f: f.priority, reverse=True)
class IPv4AddressWithMask:
"""Class to represent an IPv4 address with netmask."""
def __init__(self, address=0, netmask=0):
"""Instatiate with given ip address and netmask."""
self.address = address
self.netmask = netmask
def __repr__(self):
return f'<{self.__class__.__name__}: {self.as_dot_string()}>'
def __str__(self):
return self.as_dot_string()
def as_dot_string(self):
# pylint: disable=W0631
"""Represent an IPv4 address with mask as 0.0.0.0/0."""
packed = struct.pack('!I', self.address)
unpacked_bytes = struct.unpack('!4B', packed)
address = '.'.join([str(x) for x in unpacked_bytes])
for i in range(1, 33):
stripped_mask = self.netmask >> i << i
if stripped_mask != self.netmask:
break
mask = 33 - i
return f'{address}/{mask}'
def unpack(self, buffer, start=0):
"""Unpack IPv4 address and netmask."""
self.address, self.netmask = struct.unpack('!2I',
buffer[start:start+8])
class IPv6AddressWithMask:
"""Class to represent an IPv6 address with netmask."""
def __init__(self, address=0, netmask=0):
"""Instantiate with given ip address and netmask."""
self.address = address
self.netmask = netmask
def __repr__(self):
return f'<{self.__class__.__name__}: {self.as_comma_string()}>'
def __str__(self):
return self.as_comma_string()
def as_comma_string(self):
# pylint: disable=W0631
"""Represent an IPv6 address with mask as ffff::0/0."""
address = []
addrs = divmod(self.address, 2**64)
for addr in addrs:
packed = struct.pack('!Q', addr)
unpacked_bytes = struct.unpack('!4H', packed)
address.append(':'.join([f'{b:x}' for b in unpacked_bytes]))
address = ':'.join(address)
for i in range(1, 129):
stripped_mask = self.netmask >> i << i
if stripped_mask != self.netmask:
break
mask = 129 - i
return f'{address}/{mask}'
def unpack(self, buffer, start=0):
"""Unpack IPv6 address and mask."""
self.address = int.from_bytes(buffer[start:start+16], 'big')
self.netmask = int.from_bytes(buffer[start+16:start+32], 'big')
def format_request(request):
"""Format user request to match function format."""
args = {}
for key, value in request.items():
args[key] = value
return args