-
Notifications
You must be signed in to change notification settings - Fork 0
/
chainProcessor.py
executable file
·187 lines (133 loc) · 5.73 KB
/
chainProcessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/python
from lib.processes import *
import zmq
import sys
import requests
import json
import pkgutil
import logging
from multiprocessing import Process
from chaincrawler import chainCrawler, chainSearch
from chainlearnairdata import chainTraversal
logging.basicConfig(stream=sys.stderr)
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
log.propagate = 0
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
log.addHandler(ch)
##
#CREATE CRAWLER PROCESS THAT SEARCHES FOR SENSORS AND PUSHES THEM OVER ZMQ
##
def create_crawler_process(socket="tcp://127.0.0.1:5557",
namespace='http://learnair.media.mit.edu:8000/rels/', criteria=None):
return Process(target=crawler_spawn, args=(socket, namespace, criteria))
def crawler_spawn(socket, namespace, criteria):
#TODO: currently the crawler only supports finding one type of object
# it would be great to pass a list of objects we'd like to find instead of
# crawling for one particular type.
# Instead of just crawling for sensors, crawl for list of process names
crawler = chainCrawler.ChainCrawler(entry_point='http://learnair.media.mit.edu:8000/sites/3')
if criteria is not None:
crawler.crawl_zmq(socket=socket, namespace=namespace, resource_extra=criteria)
else:
crawler.crawl_zmq(socket=socket, namespace=namespace, resource_type='Sensor')
##
#CREATE MAIN PROCESS THAT RECEIVES SENSOR URIS, CHECKS THEM AGAINST THE PROCESSES
#WE HAVE TO RUN, SENDS DATA TO SECONDARY PROCESS, AND PUBLISHES DATA FROM SECONDARY
#PROCESS TO 'VIRTUAL' SENSORS
##
def create_main_process(socket="tcp://127.0.0.1:5557"):
return Process(target=main_spawn, args=(socket,))
def main_spawn(socket):
#get list of names of processes (should be names of sensors we're interested in)
processes = [name for _, name, _ in pkgutil.iter_modules(['lib/processes'])]
context = zmq.Context()
zmqReceive = context.socket(zmq.PULL)
zmqReceive.connect(socket)
while(1):
uri = zmqReceive.recv_string()
#retrieve uri, put into json
res_json = get_json_from_uri(uri)
#check if sensor_type matches a process
process = check_sensor_type_has_process(res_json, processes)
metric = get_attribute(res_json, 'metric')
unit = get_attribute(res_json, 'unit')
if process is None or metric is None or unit is None:
continue
#check if process requires extra data
aux_data = globals()[process].required_aux_data(metric, unit)
print 'auxdata is %s' %aux_data
#get required data using traversal
traveler = chainTraversal.ChainTraversal(entry_point=uri)
data = []
data.append({'main': traveler.get_all_data()})
if aux_data is not None:
searcher = chainSearch.ChainSearch(entry_point=uri)
for title in aux_data:
found = searcher.find_first(resource_title=title)
if found:
traveler = chainTraversal.ChainTraversal(entry_point=found[0])
data.append({title: traveler.get_all_data()})
#add geotag data 'lat', 'lon', 'elevation' to each datapoint
#we are assuming that all sensors are part of the same device/site
data = add_geotags(uri, data)
#call process_data on data from sensor
publish_vals = globals()[process].process_data(data, metric, unit)
#publish any data returned from subprocess
if publish_vals is not None:
searcher = chainSearch.ChainSearch(entry_point=uri)
found = searcher.find_first(resource_type='device',
namespace='http://learnair.media.mit.edu:8000/rels/')
if found:
traveler = chainTraversal.ChainTraversal(entry_point=found[0])
try:
traveler.add_and_move_to_resource('Sensor',
{'sensor_type': publish_vals[0],
'metric': publish_vals[1],
'unit': publish_vals[2]} )
if publish_vals[3] is not None:
traveler.safe_add_data(publish_vals[3])
except:
log.warn('publish data from processor malformed')
else:
log.warn("can't find device to publish data to")
else:
log.info('no values to publish')
def get_attribute(json, field):
try:
return json[field]
except:
return None
def get_json_from_uri(uri):
try:
req = requests.get(uri)
log.info( '%s downloaded.', uri )
return req.json()
except requests.exceptions.ConnectionError:
log.warn( 'URI "%s" unresponsive', uri )
return None
def check_sensor_type_has_process(res_json, processes):
#return process or none if no process for this sensor exists
try:
sensor_type = res_json['sensor_type']
for process in processes:
if sensor_type.lower() == process.lower():
log.info('sensor_type %s matches a process', process)
return process
log.info('sensor_type %s does not match any process', sensor_type)
return None
except:
log.warn('no sensor_type detected')
return None
def add_geotags(uri, data):
'''(1) pull geotag data from device or site. If site exists it's stationary,
append geotag to all data. If device it's not stationary, append geotag to
each individual datapoint with some tolerance for timing. If neither site
nor device have geotag throw an error. uri is uri of sensor.'''
if __name__=='__main__':
socket="tcp://127.0.0.1:5557"
p1 = create_main_process(socket)
p2 = create_crawler_process(socket)
p1.start()
p2.start()