forked from BrewBlox/brewblox-history
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relays.py
176 lines (136 loc) · 5.11 KB
/
relays.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Functionality for persisting eventbus messages to the database
"""
import collections
from aiohttp import web
from brewblox_service import brewblox_logger, events, features
from brewblox_history import influx
FLAT_SEPARATOR = '/'
LOGGER = brewblox_logger(__name__)
routes = web.RouteTableDef()
class DataRelay(features.ServiceFeature):
"""Writes data from subscribed events to the database.
After a subscription is set, it will relay all incoming messages.
When relaying, the data dict is flattened.
The first part of the routing key is considered the controller name,
and becomes the InfluxDB measurement name.
All subsequent routing key components are considered to be sub-set indicators of the controller.
If the routing key is controller1.block1.sensor1, we consider this as being equal to:
'controller1': {
'block1': {
'sensor1': <event data>
}
}
Data in sub-dicts (including those implied by routing key) is flattened.
The key name will be the path to the sub-dict, separated by /.
If we'd received an event where:
routing_key = 'controller1.block1.sensor1'
data = {
settings: {
'setting': 'setting'
},
values: {
'value': 'val',
'other': 1
}
}
it would be flattened to:
{
'block1/sensor1/settings/setting': 'setting',
'block1/sensor1/values/value': 'val',
'block1/sensor1/values/other': 1
}
If the event data is not a dict, but a string, it is first converted to:
{
'text': <string data>
}
This dict is then flattened.
"""
def __init__(self, app: web.Application):
super().__init__(app)
self._listener = events.get_listener(app)
self._writer = influx.get_data_writer(app)
def __str__(self):
return f'<{type(self).__name__} {self._writer}>'
async def startup(self, _):
pass
async def shutdown(self, _):
pass
def _influx_formatted(self, d, parent_key='', sep='/'):
"""Converts a (nested) JSON dict to a flat, influx-ready dict
- Nested values are flattened, using `sep` as path separator
- Boolean values are converted to a number (0 / 1)
"""
items = []
for k, v in d.items():
new_key = f'{parent_key}{sep}{k}' if parent_key else str(k)
if isinstance(v, list):
v = {li: lv for li, lv in enumerate(v)}
if isinstance(v, collections.MutableMapping):
items.extend(self._influx_formatted(v, new_key, sep=sep).items())
elif isinstance(v, bool):
items.append((new_key, int(v)))
else:
items.append((new_key, v))
return dict(items)
async def on_event_message(self,
subscription: events.EventSubscription,
routing: str,
message: dict):
# Routing is formatted as controller name followed by active sub-index
# A complete push of the controller state is routed as just the controller name
routing_list = routing.split('.')
# Convert textual messages to a dict before flattening
if isinstance(message, str):
message = dict(text=message)
measurement = routing_list[0]
parent = FLAT_SEPARATOR.join(routing_list[1:])
data = self._influx_formatted(message, parent_key=parent, sep=FLAT_SEPARATOR)
LOGGER.debug(f'recv {measurement}, data={bool(data)}')
if data:
self._writer.write_soon(measurement=measurement, fields=data)
def setup(app: web.Application):
features.add(app, DataRelay(app))
app.router.add_routes(routes)
def get_data_relay(app: web.Application) -> DataRelay:
return features.get(app, DataRelay)
def subscribe(app: web.Application, exchange_name: str, routing: str):
"""
Subscribe to exchange/routing with the callback being the data relay.
"""
events.subscribe(app,
exchange_name,
routing,
on_message=get_data_relay(app).on_event_message)
@routes.post('/subscribe')
async def add_subscription(request: web.Request) -> web.Response:
"""
---
tags:
- History
summary: Add a new event subscription
description: All messages matching the subscribed topic will be relayed to the database.
operationId: history.subscribe
produces:
- application/json
parameters:
-
in: body
name: body
description: subscription
required: true
schema:
type: object
properties:
exchange:
type: string
example: brewblox
routing:
type: string
example: controller.#
"""
args = await request.json()
exchange = args['exchange']
routing = args['routing']
subscribe(request.app, exchange, routing)
return web.Response()