-
Notifications
You must be signed in to change notification settings - Fork 0
/
buttonhub_climate_handler.py
101 lines (86 loc) · 3.31 KB
/
buttonhub_climate_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from requests import HTTPError
from base_handler import *
import requests
from utils import PERM_ADMIN
UPDATE_LIST = 'up'
class ButtonhubClimateHandler(BaseHandler):
def __init__(self, config, messenger, service_hub):
super().__init__(config, messenger, service_hub, key="buttonhub_climate", name="Buttonhub Climate")
self.base_url = None
self.enabled = False
if 'buttonhub' in config:
self.base_url = config['buttonhub']['base_url']
self.rooms = config['buttonhub'].get('rooms')
self.enabled = self.rooms and self.base_url
def help(self, permission):
if not self.enabled:
return
if permission < PERM_ADMIN:
return
return {
'summary': "Can check the climate in the flat",
'examples': ["climate"],
}
def matches_message(self, message):
if not self.enabled:
return
return message.lower().strip() == 'climate'
def handle(self, message, **kwargs):
if kwargs['permission'] < PERM_ADMIN:
return "Sorry, you can't do this."
return self.check_climate()
def check_climate(self):
try:
room_states = []
buttonhub_state = self._get_state()
for room in self.rooms:
room_state = []
for device in room.get('sensors', []):
device_name = device['name']
device_state = (buttonhub_state.get(device_name) or {})
temperature = device_state.get('temperature')
if temperature is not None:
room_state.append(f'- Temperature: {round(temperature, 1)}°C')
humidity = device_state.get('humidity')
if humidity is not None:
room_state.append(f'- Humidity: {round(humidity, 1)}%')
pm25 = device_state.get('pm25')
if pm25 is not None:
room_state.append(f'- PM2.5: {pm25}')
if room_state:
room_states.append(room['name'] + ':\n' + ('\n'.join(room_state)))
if not room_states:
message = 'No sensor readings found'
else:
message = '\n\n'.join(room_states)
return {
'message': message,
'buttons': [
[
{
'text': 'Update',
'data': UPDATE_LIST,
}
]
],
}
except HTTPError:
return {
'message': 'Failed to check climate :/',
'answer': 'Error!',
}
except requests.exceptions.ConnectionError:
return {
'message': 'Failed to check climate :/',
'answer': 'Error!',
}
def _get_state(self):
response = requests.get(f'{self.base_url}/state', timeout=5)
response.raise_for_status()
return response.json()
def handle_button(self, data, **kwargs):
if data == UPDATE_LIST:
msg = self.check_climate()
msg['answer'] = "Updated."
return msg
return "Uh oh, something is off"