-
Notifications
You must be signed in to change notification settings - Fork 1
/
purpleair.v6.py
94 lines (77 loc) · 2.5 KB
/
purpleair.v6.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#
# Pulling data from Purpleair and sending to Graphite by Python
#
# Core original code written by Durren Shen @durrenshen
# in a blog at https://www.wavefront.com/weather-metrics/
#
# Cleaned up by Bill Roth @BillRothVMware, and [email protected] and rewritten.
#
import requests
import json
import socket
import syslog
import sys
import time
import re
import math
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
def write_measurement(measurements : list):
client = InfluxDBClient(url="http://192.168.0.1:9998", token='CODE==', org='cottle')
write = client.write_api(write_options=SYNCHRONOUS)
try:
res = write.write(bucket="air", org="cottle", record=measurements,write_precision="s")
except Exception as e:
syslog.syslog(syslog.LOG_ERROR,str(e))
sys.exit(1)
client.close()
def print_output():
#
# Below assumes the proxy is running on the same system. YMMV
#
# Get epoch time
#
epochtime = time.time()
#
#
key= {'X-API-Key' : 'CODE'}
try:
response = requests.get('https://api.purpleair.com/v1/sensors/60664', headers=key)
except Exception as e:
print(e)
sys.exit(1)
if response.status_code != 200:
if response.status_code in (429,500):
syslog.syslog(f"Error received {response.status_code}. Exiting quietly")
sys.exit(0) # quietly ignore
else:
print(f'Error received: {response.status_code} : Text = {response.reason} {response.text}')
sys.exit(1)
parsed_json = response.json()
measurements = [{
'measurement' : 'air_quality',
'tags': {'host' : socket.gethostname(),
'where' : 'cottle' },
'fields' : {'epochtime' : str(int(epochtime))}
}]
for i in parsed_json['sensor']:
if i in ('stats', 'stats_a', 'stats_b'):
pass
else:
measurements[0]['fields'][i] = parsed_json['sensor'][i]
measurements[0]['time'] = parsed_json['data_time_stamp']
write_measurement(measurements)
syslog.syslog(syslog.LOG_INFO, "Air quality logged at " + str(epochtime))
# aqi = calc_aqi(pm2_5_atm)
# syslog.syslog('Air Quality logged at ' + sepochtime)
def calc_aqi(inp):
x = float(inp)
#
# attempted curve fit of AQI functoin. negative exponential based on https://www.airnow.gov/aqi/aqi-calculator/
#
ret = (-3E-13 * x**6) + (8E-10 * x**5) - (6E-07 * x**4) + (0.0002 * x**3) - (0.0434 * x**2) + (4.2234*x) + 1.2597
return ret
def main():
print_output()
if __name__ == "__main__":
main()