-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.py
145 lines (117 loc) · 4.87 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#
# Copyright(c) 2023 Swisscom (Schweiz) AG
# Authors: Marco Tollini, Leonardo Rodoni
# Distributed under the MIT License (http://opensource.org/licenses/MIT)
#
# External Libraries
import logging
from time import time, sleep
# Internal Libraries
from packet_manager import PacketWithMetadata
from report import report
from proto import Proto
from proto_client import GenericTCPPClient, GenericUDPPClient, BGPPClient, BMPPClient, IPFIXPClient
class Client:
def __init__(
self,
queue_th,
network_map,
collectors,
optimize_network,
network_interface,
) -> None:
self.job_queue = queue_th # can be None if no threading
self.network_map = network_map
self.collectors = collectors
self.optimize_network = optimize_network
self.network_interface = network_interface
self.src_ip = network_map['src_ip']
self.repro_ip = network_map['repro_ip']
self.interface = network_interface
self.so_sndbuf = optimize_network['so_sndbuf']
self.so_rcvbuf = optimize_network['so_rcvbuf']
self.pclients = {}
if Proto.tcp_generic.value in collectors:
self._generic_tcp_client_config()
if Proto.udp_generic.value in collectors:
self._generic_udp_client_config()
if Proto.bgp.value in collectors:
self._bgp_client_config()
if Proto.bmp.value in collectors:
self._bmp_client_config()
if Proto.ipfix.value in collectors:
self._ipfix_client_config()
def _generic_tcp_client_config(self):
client_tcp = GenericTCPPClient(
self.collectors[Proto.tcp_generic.value],
self)
self.pclients[Proto.tcp_generic.value] = client_tcp
def _generic_udp_client_config(self):
client_udp = GenericUDPPClient(
self.collectors[Proto.udp_generic.value],
self)
self.pclients[Proto.udp_generic.value] = client_udp
def _bgp_client_config(self):
client_bgp = BGPPClient(
self.collectors[Proto.bgp.value],
self)
self.pclients[Proto.bgp.value] = client_bgp
def _bmp_client_config(self):
client_bmp = BMPPClient(
self.collectors[Proto.bmp.value],
self)
self.pclients[Proto.bmp.value] = client_bmp
def _ipfix_client_config(self):
client_ipfix = IPFIXPClient(
self.collectors[Proto.ipfix.value],
self)
self.pclients[Proto.ipfix.value] = client_ipfix
def _send(self, packetwm: PacketWithMetadata):
# no-threading
proto = packetwm.type
payload = self.get_payload(packetwm)
return self.pclients[proto].send(payload, proto)
# Return the byte array that should be sent to the collector
def get_payload(self, packetwm: PacketWithMetadata):
proto = packetwm.type
return self.pclients[proto].get_payload(packetwm)
# helper for threading - running a thread
def listen(self):
while True:
packetwm = self.job_queue.get(block=True)
if packetwm is None:
break # exit signal
self._send(packetwm)
qsize = self.job_queue.qsize()
report.set_queue_size(qsize)
logging.info(f'{self.repro_ip} thread exited')
# reproduce a packet
# If threading, then packet is put in a queue
# Else, send packet
# first_pkt tells if we already sent a packet or not
# if not, we want to sync to the same bucket. We need to wait
# the same amount of seconds from the minute as in the pcap
def reproduce(self, packetwm: PacketWithMetadata, should_sync_ipfix):
i = packetwm.number
proto = packetwm.type
if proto not in self.pclients:
logging.critical(f"[{i}][{self.repro_ip}][{proto}] No clients found for protocol - SHOULD NEVER HAPPEN")
return -1
if should_sync_ipfix: # only runs once between all clients
now_second_from_min = time() % 60
pkt_second_from_min = float(packetwm.packet.time) % 60
sleep_time = round((pkt_second_from_min - now_second_from_min) % 60, 3)
# Some debug logs
logging.debug(f"Second in first packet in the pcap: {pkt_second_from_min}")
logging.debug(f"Second in current minute: {now_second_from_min}")
logging.debug(f"Sleep time: {sleep_time}s [in 1ms steps]")
while sleep_time > 0:
logging.info(f"Waiting an additional {sleep_time}s to sync IPFIX to pmacct bucket")
sleep(min(sleep_time, 1))
sleep_time = round(sleep_time - 1, 3)
if self.job_queue is not None:
# threading mode, add to queue
self.job_queue.put(packetwm)
# return 0 to signify that packet was not discared
return 0
return self._send(packetwm)