-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclientCollector.py
261 lines (219 loc) · 7.88 KB
/
clientCollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import hashlib
import hmac
import json
import random
from abc import ABCMeta, abstractmethod
import threading
from time import strftime, gmtime
from twisted.internet import protocol
from pipot.encryption import Encryption
class ICollector:
"""
Interface that represents a uniform collector.
"""
__metaclass__ = ABCMeta
def __init__(self):
pass
@abstractmethod
def process_data(self, data):
"""
Server-side processing of received data.
:param data: A JSONified version of the data.
:type data: str
:return: None
:rtype: None
"""
pass
@abstractmethod
def queue_data(self, service_name, data):
"""
Client-side processing of data to send
:param service_name: The name of the service.
:type service_name: str
:param data: A JSON collection of data
:type data: dict
:return: None
:rtype: None
"""
pass
class ClientCollector(ICollector):
"""
Interface for submitting (sending) a message to the collector
"""
__metaclass__ = ABCMeta
""":type : threading.Lock"""
_collector_lock = threading.Lock()
def __init__(self, config, reactor):
"""
Creates an instance of the collector. Expects the JSON config,
an implementation of the CollectorMessage interface, and the
reactor of Twisted.
:param config: The JSON config for the collector.
:type config: dict[str]
:param reactor: Instance of the Twisted reactor
:type reactor: twisted.internet.interfaces.IReactorInThreads
"""
super(ClientCollector, self).__init__()
self._config = config
self._queue = []
self._reactor = reactor
self._closing = False
self._closing_done = False
# Start queue sender
reactor.callInThread(self._queue_monitor, random.randint(2, 10))
def halt_and_catch_fire(self):
"""
Sends a final message and flushes the queue.
:return: None
:rtype: None
"""
self.queue_data('PiPot', 'PiPot shutting down')
self._closing = True
while not self._closing_done:
import time
time.sleep(1)
def process_data(self, data):
pass
def queue_data(self, service_name, data):
"""
Adds a message from a given service to the queue, so it can be sent
at a random time interval.
:param service_name: The name of the service calling the method.
:type service_name: str
:param data: The message (can be an object too). Must be JSON
serializable.
:type data: Any
:return: None
:rtype: None
"""
with self._collector_lock:
self._queue.append({
'service': service_name,
'data': data,
'timestamp': strftime("%Y-%m-%d %H:%M:%S", gmtime())
})
def _queue_monitor(self, max_queue_length):
"""
Monitors the queue, and calls the submit_message of the collector
if either a certain time elapses, or if the queue length exceeds
the provided max_queue_length.
:param max_queue_length: The maximum amount of entries that the
queue should hold before sending them at once.
:type max_queue_length: int
:return: None
:rtype: None
"""
import time
print('Waiting until we collected %s items or are 5 minutes away '
'from now' % max_queue_length)
timeout = time.time() + 60*5 # 5 minutes from now
while len(self._queue) < max_queue_length and time.time() < timeout\
and not self._closing:
time.sleep(1)
if len(self._queue) > 0:
print('Sending queued messages')
queue = []
with self._collector_lock:
queue.extend(self._queue)
self._queue = []
self._submit_messages(queue)
if not self._closing:
print('Restarting queue_monitor')
self._reactor.callInThread(
self._queue_monitor,
random.randint(2, 10)
)
else:
self._closing_done = True
@abstractmethod
def _submit_messages(self, queue):
pass
def _encode_message(self, data):
"""
Converts the given data into a JSON encoded string for transmission
to the collector.
:param data: The messages to send. Must be JSON serializable.
:type data: Any
:return: A JSON-encoded string.
:rtype: str
"""
# Calculate MAC over content
mac = hmac.new(
str(self._config['mac_key']),
str(json.dumps(data, sort_keys=True)),
hashlib.sha256
).hexdigest()
# Return JSON of the auth_key & the encrypted messages
encrypted = Encryption.encrypt(
self._config['encryption_key'],
json.dumps({'hmac': mac, 'content': data})
)
return json.dumps({
'instance': self._config['instance_key'],
'data': encrypted
})
class ClientCollectorUDPProtocol(protocol.DatagramProtocol, ClientCollector):
"""
Class that implements both the DatagramProtocol from Twisted and the
CollectorMessage class, so it can send the messages to the collector
through UDP.
"""
def __init__(self, config, reactor):
"""
Initializes the instance of this UDP protocol. Requires a host &
port to connect to.
:param config: The JSON config for the collector.
:type config: dict[str]
:param reactor: Instance of the Twisted reactor
:type reactor: twisted.internet.interfaces.IReactorInThreads
"""
super(ClientCollectorUDPProtocol, self).__init__(
config=config, reactor=reactor)
self._udp_queue = []
def startProtocol(self):
self.transport.connect(self._config['host'], self._config['port'])
if len(self._udp_queue) > 0:
print("Sending queued UDP messages")
for message in self._udp_queue:
self.transport.write(self._encode_message(message))
self._udp_queue.remove(message)
def _submit_messages(self, queue):
# Need to check. Might not be initialized yet...
if self.transport is None:
print("UDP Transport is not initialized! Queueing message")
self._udp_queue.append(queue)
return
print("Sending message through UDP")
self.transport.write(self._encode_message(queue))
class CollectorSSLProtocol(protocol.Protocol):
"""
Simple TCP protocol for connecting to a collector through a TCP
connection
"""
def __init__(self, factory):
self.factory = factory
def connectionMade(self):
self.factory._collectors.append(self)
def connectionLost(self, reason=protocol.connectionDone):
self.factory._collectors.remove(self)
class ClientCollectorSSLFactory(protocol.ClientFactory, ClientCollector):
"""
TCP factory that implements the default ClientFactory, as well as the
CollectorMessage interface, so messages can be sent through TCP (SSL)
"""
""":type : list[protocol.Protocol]"""
_collectors = []
def __init__(self, config, reactor):
"""
:param config: The JSON config for the collector.
:type config: dict[str]
:param reactor: Instance of the Twisted reactor
:type reactor: twisted.internet.interfaces.IReactorInThreads
"""
super(ClientCollectorSSLFactory, self).__init__(
config=config, reactor=reactor)
def buildProtocol(self, addr):
return CollectorSSLProtocol(self)
def _submit_messages(self, queue):
for collector in self._collectors:
collector.transport.write(self._encode_message(queue))