-
Notifications
You must be signed in to change notification settings - Fork 0
/
weather_logger.py
119 lines (106 loc) · 3.7 KB
/
weather_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import logging
import os
import time
from dotenv import load_dotenv
import asyncio
from aiohttp import ClientSession
import json
from mqtt_async import AsyncMqttClient
class AsyncHttpClient:
async def fetch(self, url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.read()
class WeatherLogger(AsyncMqttClient, AsyncHttpClient):
def __init__(
self, delay=3600, conf=None, fiware=None, entity=None, log_json=False
) -> None:
super().__init__()
self.delay = delay
self.conf = self.load_conf(conf)
self.FIWARE = fiware
self.ENTITY = entity
def load_conf(self, conf_file):
with open(conf_file) as f:
conf = json.load(f)
f.close()
return conf
async def retrieve(self, i):
c = self.conf
vs_loc = f'lat={c["vs_lat"][i]}&lon={c["vs_lon"][i]}'
postfix = "&exclude=current,minutely,daily,alerts&appid="
url = f'{c["prefix"]}/onecall?{vs_loc}{postfix}{c["appid"]}'
response = await self.fetch(url)
response = json.loads(response)
hr = response["hourly"][0] # prendo il primo (non il secondo)
timestamp = hr["dt"] * 1000
values = []
for key in c["keys"]:
value = None
if key in hr:
value = hr[key]
if key == "temp":
value -= 273.15
elif key == "rain":
value = value["1h"] # TODO; Verifare la pioggia 1h se giusto
elif key == "rain":
value = 0
else:
logging.error(
f"Key not found {key}"
)
values.append(value)
return {
"timestamp": timestamp,
"controlledProperty": c["vars"],
"value": values,
"units": c["units"],
}
async def run(self):
c = self.conf
while True:
for i, ws_name in enumerate(c["vs_name"]):
logging.info(f"Retrieving weather data for {ws_name}")
result = await self.retrieve(i)
# raise Exception()
lat_lon = [round(c["vs_lon"][i], 7), round(c["vs_lat"][i], 7)]
device = f"WeatherStation_v{i}"
dev_ = "Device:"
ptopic = f"{self.FIWARE}{self.ENTITY}{dev_}{device}/attrs"
id = f"{self.ENTITY}{dev_}{device}"
payload = {
"id": id,
"name": device,
"areaServed": ("urn:ngsi-ld:AgriFarm:" + c["vs_area"][i]),
"location": {"coordinates": lat_lon, "type": "Point"},
}
payload.update(result)
logging.debug(json.dumps(payload, indent=2))
await self.publish(ptopic, json.dumps(payload))
await asyncio.sleep(self.delay)
async def main():
load_dotenv()
FIWARE = os.getenv("FIWARE")
ENTITY = os.getenv("ENTITY")
CLIENT_ID = os.getenv("CLIENT_ID") + "_weatherLogger"
weatherLogger = WeatherLogger(
delay=3600,
conf="openweathermap_conf.json",
fiware=FIWARE,
entity=ENTITY,
log_json=True,
)
await weatherLogger.listen(
host=os.getenv("MQTTS_BROKER"),
port=int(os.getenv("MQTTS_PORT")),
username=os.getenv("MQTTS_USERNAME"),
password=os.getenv("MQTTS_PASSWORD"),
tls=True,
tls_insecure=True,
client_id=CLIENT_ID,
notify_birth=True
)
await weatherLogger.run()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())