-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenviroplus_reader.py
65 lines (54 loc) · 2.16 KB
/
enviroplus_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time
from subprocess import PIPE, Popen
from bme280 import BME280
from enviroplus import gas
from pms5003 import PMS5003, ReadTimeoutError
try:
from smbus2 import SMBus
except ImportError:
from smbus import SMBus
try:
# Transitional fix for breaking change in LTR559
from ltr559 import LTR559
ltr559 = LTR559()
except ImportError:
import ltr559
class EnviroPlusReader:
def __init__(self, comp_factor: float = 1.2) -> None:
self.bus = SMBus(1)
# Create BME280 (temp, humidity and pressure sensor) instance
self.bme280 = BME280(i2c_dev=self.bus)
# Create PMS5003 dust sensor instance
self.pms5003 = PMS5003()
self.comp_factor = comp_factor
def read_values(self):
values = {}
cpu_temp = self.get_cpu_temperature()
raw_temp = self.bme280.get_temperature()
comp_temp = raw_temp - ((cpu_temp - raw_temp) / self.comp_factor)
values['temperature'] = round(comp_temp, 2)
values['pressure'] = round(self.bme280.get_pressure(), 2)
values['humidity'] = round(self.bme280.get_humidity(), 2)
data = gas.read_all()
values['oxidising'] = round(data.oxidising / 1000, 4)
values['reducing'] = round(data.reducing / 1000, 4)
values['nh3'] = round(data.nh3 / 1000, 4)
values['lux'] = round(ltr559.get_lux(), 2)
try:
pm_values = self.pms5003.read()
values['P2.5'] = pm_values.pm_ug_per_m3(2.5)
values['P10'] = pm_values.pm_ug_per_m3(10)
values['P1.0'] = pm_values.pm_ug_per_m3(1.0)
except ReadTimeoutError:
self.pms5003.reset()
pm_values = self.pms5003.read()
values['P2.5'] = pm_values.pm_ug_per_m3(2.5)
values['P10'] = pm_values.pm_ug_per_m3(10)
values['P1.0'] = pm_values.pm_ug_per_m3(1.0)
values['ts'] = time.time_ns()
return values
@staticmethod
def get_cpu_temperature():
process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE, universal_newlines=True)
output, _error = process.communicate()
return float(output[output.index('=') + 1:output.rindex("'")])