-
Notifications
You must be signed in to change notification settings - Fork 0
/
buttonhub_battery_handler.py
133 lines (111 loc) · 4.13 KB
/
buttonhub_battery_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from requests import HTTPError
from base_handler import *
import datetime
import requests
from utils import PERM_ADMIN
UPDATE_LIST = 'up'
class ButtonhubBatteryHandler(BaseHandler):
def __init__(self, config, messenger, service_hub):
super().__init__(config, messenger, service_hub, key="buttonhub_battery", name="Buttonhub Batteries")
if 'buttonhub' in config:
self.base_url = config['buttonhub']['base_url']
self.config = config['buttonhub']['batteries']
self.enabled = True
else:
self.base_url = None
self.enabled = False
def help(self, permission):
if not self.enabled:
return
if permission < PERM_ADMIN:
return
return {
'summary': "Can check the battery status of devices connected to Buttonhub",
'examples': ["batteries"],
}
def matches_message(self, message):
if not self.enabled:
return
return message.lower().strip() == 'batteries'
def handle(self, message, **kwargs):
if kwargs['permission'] < PERM_ADMIN:
return "Sorry, you can't do this."
return self.check_batteries()
def check_batteries(self):
try:
battery_status = self._get_battery_status()
if not battery_status:
message = 'No devices found'
else:
message = '\n'.join([
'{}: {}%'.format(s['device'], s['battery'])
for s in battery_status
])
return {
'message': message,
'buttons': [
[
{
'text': 'Update',
'data': UPDATE_LIST,
}
]
],
}
except HTTPError:
return {
'message': 'Failed to check batteries :/',
'answer': 'Error!',
}
except requests.exceptions.ConnectionError:
return {
'message': 'Failed to check batteries :/',
'answer': 'Error!',
}
def _get_battery_status(self):
battery_status = []
buttonhub_state = self._get_state()
for device, value in buttonhub_state.items():
battery = value.get('battery')
if battery:
device_name = device
if '/' in device_name:
device_name = device_name.split('/')[1]
battery_status.append(
{'device': device_name, 'battery': int(battery)}
)
battery_status.sort(key=lambda entry: entry['device'])
return battery_status
def _get_state(self):
response = requests.get(f'{self.base_url}/state', timeout=5)
response.raise_for_status()
return response.json()
def handle_button(self, data, **kwargs):
if data == UPDATE_LIST:
msg = self.check_batteries()
msg['answer'] = "Updated."
return msg
return "Uh oh, something is off"
def run_periodically(self, db):
if not self.enabled:
return
now = datetime.datetime.now()
if now.hour != self.config['hour'] or now.minute != 0:
return
threshold = self.config['warning_threshold']
battery_status = self._get_battery_status()
endangered_devices = [
device for device in battery_status if device['battery'] <= threshold
]
if not endangered_devices:
return
send = self._messenger.send_message
if len(endangered_devices) == 1:
device = endangered_devices[0]
message = "{} is low on battery ({}%)".format(device['device'], device['battery'])
else:
message = "There are devices low on battery:"
for device in endangered_devices:
message = message + "\n- {} ({}%)".format(device['device'], device['battery'])
for user in self.config['users']:
send(message, recipient_id=user)