-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnlenergie.py
executable file
·185 lines (123 loc) · 5.28 KB
/
nlenergie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#! /usr/bin/python3
# by FvH, released under Apache License v2.0
# either install 'python3-paho-mqtt' or 'pip3 install paho-mqtt'
import datetime
from hasscfg import *
import json
import math
import paho.mqtt.client as mqtt
import pytz
import requests
import threading
import time
import urllib.parse
import urllib.request
import socket
import sys
mqtt_server = 'mqtt.vm.nurd.space' # TODO: hostname of MQTT server
topic_prefix = 'GHBot/' # leave this as is
channels = ['nurdbottest', 'nurds', 'nurdsbofh'] # TODO: channels to respond to
prefix = '!' # !command, will be updated by ghbot
netherlands_tz = pytz.timezone("Europe/Amsterdam")
prev_j = None
def call_hass(sensor, payload = None):
url = api_url + sensor
headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token }
if payload != None:
req = urllib.request.Request(url, data=payload.encode('ascii'), headers=headers)
else:
req = urllib.request.Request(url, data=None, headers=headers)
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
return None
def announce_commands(client):
target_topic = f'{topic_prefix}to/bot/register'
client.publish(target_topic, 'cmd=nlenergie|descr=Stroomgebruik in Nederland op dit moment')
def on_message(client, userdata, message):
global prefix
global prev_j
text = message.payload.decode('utf-8')
topic = message.topic[len(topic_prefix):]
if topic == 'from/bot/command' and text == 'register':
announce_commands(client)
return
if topic == 'from/bot/parameter/prefix':
prefix = text
return
if len(text) == 0:
return
parts = topic.split('/')
channel = parts[2] if len(parts) >= 3 else 'nurds' # default channel if can't be deduced
nick = parts[3] if len(parts) >= 4 else 'jemoeder' # default nick if it can't be deduced
if text[0] != prefix:
return
command = text[1:].split(' ')[0]
if channel in channels or (len(channel) >= 1 and channel[0] == '\\'):
response_topic = f'{topic_prefix}to/irc/{channel}/notice'
if command == 'nlenergie':
try:
parts = text.split()
verbose = True if len(parts) >= 2 and parts[1] == '-v' else False
headers = { 'User-Agent': 'GHBot' }
r = requests.get('http://stofradar.nl:9001/energy/latest', timeout=10, headers=headers)
try:
j = json.loads(r.content.decode('ascii'))
t = j['time']
prev_j = j
except Exception as e:
j = prev_j
t = j['time']
total = j['total']
out = ''
outblocks = ''
outblocks_l = ''
for source in j['mix']:
if out != '':
out += ', '
outblocks_l += ', '
perc = source['power'] * 100.0 / j['total']
not_perc = source['power'] * 40.0 / j['total']
if source['id'] == 'solar':
color_index = 8
elif source['id'] == 'wind':
color_index = 12
elif source['id'] == 'nuclear':
color_index = 9
elif source['id'] == 'waste':
color_index = 15
elif source['id'] == 'other':
color_index = 6
elif source['id'] == 'fossil':
color_index = 5
else:
color_index = (abs(hash(source['color']) * 9) % 13) + 2
out += f"\3{color_index}{source['id']}: {source['power']} MW ({perc:.2f}%)"
outblocks += f'\3{color_index}'
outblocks += '\u2588' * math.ceil(not_perc)
outblocks_l += f"\3{color_index}\u2588 {source['id']}"
ts = netherlands_tz.localize(datetime.datetime.fromtimestamp(t))
out += f' ({ts})'
out += f" NURDspace consumption: {float(call_hass('states/sensor.power')['state']) * 100 / (total * 1000000):.20f}%"
if verbose:
client.publish(response_topic, outblocks + f' ({ts} / {outblocks_l})')
else:
client.publish(response_topic, out)
except Exception as e:
client.publish(response_topic, f'Exception during "nlenergie": {e}, line number: {e.__traceback__.tb_lineno}')
def on_connect(client, userdata, flags, rc):
client.subscribe(f'{topic_prefix}from/irc/#')
client.subscribe(f'{topic_prefix}from/bot/command')
def announce_thread(client):
while True:
try:
announce_commands(client)
time.sleep(4.1)
except Exception as e:
print(f'Failed to announce: {e}')
client = mqtt.Client(f'{socket.gethostname()}_{sys.argv[0]}', clean_session=False)
client.on_message = on_message
client.on_connect = on_connect
client.connect(mqtt_server, port=1883, keepalive=4, bind_address="")
t = threading.Thread(target=announce_thread, args=(client,))
t.start()
client.loop_forever()