This repository has been archived by the owner on Mar 18, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
server.py
311 lines (249 loc) · 10.9 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""
Backend for a HDB++ archive data viewer.
Basic functionality:
- searching for stored attributes
- selecting which attributes to add
- free scrolling/zooming
- two separate Y axes
- Y axes autoscale
- encodes current view in URL
- min/max etc on mouseover
- endpoints for getting raw data in CSV/JSON formats
Missing functionality:
- configure color, etc for each line
- realtime updates (just a periodic reload should be fine)
- show units, labels, ...
- ability to identify lines e.g. by hover
- display errors?
- manual Y axis scaling
- showing write values
- showing archiving history like add/remove/start/stop...
Improvements needed:
- Re-loads the view each time anything changes, maybe possible
to be smarter here? We're caching db results at least.
- UI is very basic
- Plotting is a mess, maybe use canvas instead
- mouseover stuff messy
- Probably using pandas etc inefficiently
Ideas:
- Use websocket to send data as we get it instead of afterwards?
- use dask for lazy parallelization
"""
from concurrent.futures import ThreadPoolExecutor
from collections import OrderedDict
from functools import partial
import fnmatch
import json
import logging
import re
import aiohttp
import asyncio
from aiohttp import web
import aiohttp_cors
from aiohttp_utils import negotiation
from plot import get_extrema, make_axis_images
from hdbpp import HDBPlusPlusConnection
from utils import timer, parse_time_to_naive
from data import get_data, render_data_csv, render_data_json
async def get_health(request):
"""Check that the application is alive"""
return web.Response(text="OK")
async def get_controlsystems(hdbpp, request):
"Handle queries for the list of TANGO hosts we have data for"
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, hdbpp.get_att_configs)
controlsystems = sorted(result.keys())
data = json.dumps({"controlsystems": controlsystems})
return web.Response(body=data.encode("utf-8"),
content_type="application/json")
async def get_attributes(hdbpp, request):
"Handle queries for attribute names"
cs = request.query["cs"]
search = request.query["search"]
max_n = request.query.get("max", 100)
regex = fnmatch.translate(search)
logging.info("search: %s", search)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, hdbpp.get_attributes)
attributes = sorted("%s/%s/%s/%s" % attr
for attr in result[cs])
matches = [attr for attr in attributes
if re.match(regex, attr, re.IGNORECASE)]
data = json.dumps({"attributes": matches})
return web.Response(body=data.encode("utf-8"),
content_type="application/json")
async def get_images(hdbpp, request):
"Get images for a bunch of attributes; one image per y-axis"
# TODO: probably makes more sense to send one image per attribute
# instead. The overhead is pretty low anyway and it makes it
# possible to do more dynamic stuff on the client like hiding/
# showing attributes, changing color...
params = await request.json()
attributes = params["attributes"]
time_range = [parse_time_to_naive(params["time_range"][0]),
parse_time_to_naive(params["time_range"][1])]
size = params["size"]
axes = params.get("axes")
logging.debug("Attributes: %r", attributes)
logging.debug("Time range: %r", time_range)
logging.debug("Image size: %r", size)
logging.debug("Axis config: %r", axes)
# Note: unfortunately, the way things work right now it's not
# possible to run these steps in parallel. E.g. in order to create
# the final image, we need all the data since we must know the
# global max and min values. Luckily, usually the dominating
# factor will be the database calls, and these can be
# parallelized.
# get archived data from cassandra
with timer("Fetching from database"):
attr_names = [a["name"] for a in attributes]
data = await get_data(hdbpp, attr_names, time_range)
# calculate the max/min for each y-axis
with timer("Calculating extrema"):
per_axis = get_extrema(attributes, data, time_range, axes)
# Now generate one image for each y-axis.
loop = asyncio.get_event_loop()
with timer("Making images"):
# TODO: for now, we're running this in the default thread pool.
# I haven't benchmarked this, but I'm hoping that this will speed
# things up (apart from not blocking) since numpy etc can release
# the GIL. Maybe look into using a process pool?
images, descs = await loop.run_in_executor(
None, partial(make_axis_images, per_axis, time_range, size, axes))
# Now wrap all the results up in a JSON response.
data = json.dumps({"images": images, "descs": descs})
response = web.Response(body=data.encode("utf-8"),
content_type="application/json")
# With compression, the size of the data goes down even further, almost
# an order of magnitude. Typical size is a few 10s of kB! It's up to the
# client to allow it, though.
response.enable_compression()
return response
async def post_raw_query(hdbpp, request):
"Handle queries for data in 'raw' (csv or json) form"
params = await request.json()
attributes = ["{cs}/{target}".format(**t) for t in params["targets"]]
time_range = [parse_time_to_naive(params["range"]["from"]),
parse_time_to_naive(params["range"]["to"])]
interval = params.get("interval")
data = await get_data(hdbpp, attributes, time_range, interval,
restrict_time=True)
response = negotiation.Response(data=data)
response.enable_compression()
return response
async def post_raw_query_http(hdbpp, request):
"Handle queries for data in 'raw' (csv or json) form from the browser"
params = await request.json()
attributes = params["attributes"]
time_range = [parse_time_to_naive(params["time_range"][0]),
parse_time_to_naive(params["time_range"][1])]
data = await get_data(hdbpp, attributes, time_range)
response = negotiation.Response(data=data)
response.enable_compression()
return response
async def post_raw_search(hdbpp, request):
"Handle queries to search for attributes by name"
# Note: follows the Grafana JSON API
# TODO: probably makes sense to merge this with the other
# attribute search endpoint "get_attributes" above?
params = await request.json()
term = params["target"]
cs = params["cs"]
search = params["target"]
regex = ".*{}.*".format(search)
logging.info("search: %s", search)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, hdbpp.get_attributes)
attributes = sorted("%s/%s/%s/%s" % attr
for attr in result[cs])
matches = [attr
for attr in attributes
if re.match(regex, attr, re.IGNORECASE)]
data = json.dumps(matches)
return web.Response(body=data.encode("utf-8"),
content_type="application/json")
if __name__ == "__main__":
import argparse
import configparser
from middleware import IndexMiddleware
# parse commandline arguments
parser = argparse.ArgumentParser(
description='A web based viewer for HDB++ data')
parser.add_argument("-c", "--config", type=str, default="hdbppviewer.conf",
help="Path to a configuration file")
parser.add_argument("-d", "--debug", action="store_true",
help="Run in debug mode")
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# load configuration
config = configparser.RawConfigParser()
config.read(args.config)
PORT = config.getint("server", "port")
CASSANDRA_NODES = config["hdbpp:cassandra"].get("nodes").split(",")
CASSANDRA_KEYSPACE = config["hdbpp:cassandra"].get("keyspace")
CASSANDRA_CONSISTENCY_LEVEL = config["hdbpp:cassandra"].get("consistency_level", "ONE")
if config.has_section("hdbpp:cassandra_address_translation"):
CASSANDRA_ADDRESS_TRANSLATION = dict(
config.items("hdbpp:cassandra_address_translation"))
else:
CASSANDRA_ADDRESS_TRANSLATION = {}
DATA_CACHE_SIZE = config.getint("server", "data_cache_size")
# start web server
app = aiohttp.web.Application(debug=args.debug,
middlewares=[IndexMiddleware()])
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(10))
hdbpp = HDBPlusPlusConnection(nodes=CASSANDRA_NODES,
keyspace=CASSANDRA_KEYSPACE,
address_map=CASSANDRA_ADDRESS_TRANSLATION,
cache_size=DATA_CACHE_SIZE,
consistency_level=CASSANDRA_CONSISTENCY_LEVEL)
app.router.add_route('GET', '/health', get_health)
app.router.add_route('GET', '/controlsystems',
partial(get_controlsystems, hdbpp))
app.router.add_route('GET', '/attributes',
partial(get_attributes, hdbpp))
app.router.add_route('POST', '/image',
partial(get_images, hdbpp))
# Configure default CORS settings. This is required for e.g. grafana
# to be able to access the service.
# TODO: this may be too permissive.
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
})
# set up content negotiation so that clients can request
# e.g. csv data.
negotiation.setup(app, renderers=OrderedDict([
('text/plain', render_data_csv),
('text/csv', render_data_csv),
('application/json', render_data_json)
]))
cors.add(app.router.add_route('POST', '/query',
partial(post_raw_query, hdbpp)))
cors.add(app.router.add_route('POST', '/search',
partial(post_raw_search, hdbpp)))
cors.add(app.router.add_route('POST', '/httpquery',
partial(post_raw_query_http, hdbpp)))
# everything else assumed to be requests for static files
# maybe add '/static'?
cors.add(app.router.add_static('/', 'static'))
handler = app.make_handler(debug=args.debug)
f = loop.create_server(handler, '0.0.0.0', PORT)
logging.info("Point your browser to http://localhost:%d/", PORT)
srv = loop.run_until_complete(f)
try:
loop.run_forever()
except KeyboardInterrupt:
print("Ctrl-C was pressed")
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(handler.finish_connections(1.0))
loop.close()