-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwsn_data_django.py
49 lines (39 loc) · 1.43 KB
/
wsn_data_django.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import requests
from requests import exceptions
from mq import MQ, Pause
import waspmote
class Consumer(MQ):
name = 'wsn_data_django'
prefetch_count = 20
def __init__(self):
super().__init__()
self.url = self.config['url']
self.headers = {'Authorization': 'Token %s' % self.config['token']}
self.session = requests.Session()
def sub_to(self):
return ('wsn_data', 'fanout', self.name, self.handle_message)
def handle_message(self, data):
# RabbitMQ timeouts after 60s, so we must finish in less than 60s or
# the connection to RabbitMQ will be broken.
json = waspmote.data_to_json(data)
# 5s to connect. And 30s between reception of bytes; XXX this does not
# guarantees that we'll finish below 60s, so there's room for
# improvement.
timeout = (5, 30)
try:
response = self.session.post(
self.url,
json=json,
headers=self.headers,
timeout=timeout
)
except (exceptions.ConnectionError, exceptions.ReadTimeout) as exc:
self.warning(str(exc))
raise Pause(5*60)
# Check response
status = response.status_code
text = response.text
assert status == 201, 'Unexpected status=%s text=%s' % (status, text)
if __name__ == '__main__':
with Consumer() as consumer:
consumer.start()