-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_integration.py
executable file
·189 lines (151 loc) · 6.84 KB
/
test_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python3
"""Integration test for hestia.
This script stands up a mosquitto mqtt broker and hestia, and sends signals over mqtt to test hestia's behavior.
"""
import atexit
from decimal import Decimal
from os import environ
from subprocess import Popen
from time import sleep
from gourd import Gourd
from milc import cli
from hestia import config
# Tweak the behavior of this script
MOSQUITTO_SLEEP_TIME=3
HESTIA_SLEEP_TIME=3
# Set some variables to control hestia's behavior
environ['DEDUPE_TIME'] = '0.5'
environ['HEATER_NAME'] = 'test_heater'
environ['MQTT_BASE_TOPIC'] = 'test_hestia'
environ['MQTT_CLIENT_ID'] = 'test_hestia'
environ['MQTT_USER'] = ''
environ['MQTT_PASSWD'] = ''
environ['MQTT_HOST'] = 'localhost'
environ['MQTT_PORT'] = '3883'
environ['MQTT_QOS'] = '1'
environ['MQTT_TIMEOUT'] = '30'
environ['PAYLOAD_HEATER_ON'] = 'HEATER_TURNS_ON'
environ['PAYLOAD_HEATER_OFF'] = 'HEATER_TURNS_OFF'
environ['TEMP_DESIRED'] = '20'
environ['TEMP_VARIANCE'] = '1'
environ['TEMP_MIN'] = '15'
environ['TEMP_MAX'] = '25'
environ['TOPIC_TEMP_PROBE'] = 'probe/temp'
environ['TOPIC_HUMIDITY_PROBE'] = 'probe/humidity'
environ['TOPIC_HEATER_SWITCH'] = 'switch/heater'
environ['TOPIC_TARGET_SET'] = f"{environ['MQTT_BASE_TOPIC']}/{environ['HEATER_NAME']}/target_C/set"
# Objects
app = Gourd(app_name='test_hestia_script', mqtt_host=environ['MQTT_HOST'], mqtt_port=int(environ['MQTT_PORT']), username=environ['MQTT_USER'], password=environ['MQTT_PASSWD'], timeout=int(environ['MQTT_TIMEOUT']), log_mqtt=False)
mqtt_messages = {} # {topic: payload}
processes = {}
@app.subscribe('#')
def mqtt_listen(msg):
if not msg.topic.endswith('/debug'):
cli.log.info('{fg_green}MQTT Message: {fg_cyan}%s: {fg_blue}%s', msg.topic, msg.payload)
mqtt_messages[msg.topic] = msg.payload
@atexit.register
def atexit_cleanup():
"""Cleanup background processes before exiting.
"""
# Tell all processes to exit, in a nice way
cli.log.info('{fg_green}Cleaning up background processes...')
for process in processes.values():
process.terminate()
# Give them at least 5 seconds to stop
for i in range(5):
for process in processes.values():
if process.poll() is None:
sleep(1)
# Tell any remaining processes to exit, non-politely
for process_name, process in processes.items():
if process.poll() is None:
cli.log.error('{fg_red}%s did not terminate, killing...', processs_name.title())
process.kill()
def check_procs():
"""Returns True if all background processes are running.
"""
for process_name, process in processes.items():
process_status = process.poll()
if process_status is not None:
cli.log.error('{fg_red}%s is no longer running! errno %s', process_name.title(), process_status)
return False
return True
def check_temps(temp_list, on_temps, off_temps):
"""Iterate through a list of temperatures and make sure the heater turns on and off as needed.
"""
success = True
for temp in temp_list:
cli.log.info('{fg_green}Sending temperature {fg_blue}%s', temp)
app.publish(environ['TOPIC_TEMP_PROBE'], str(temp))
sleep(0.6)
if not check_procs():
success = False
break
for switch_action, switch_temps in [['ON', on_temps], ['OFF', off_temps]]:
if temp in switch_temps:
if mqtt_messages.get(environ['TOPIC_HEATER_SWITCH']) == environ[f'PAYLOAD_HEATER_{switch_action}']:
cli.log.info('{fg_blue}Sucessfully turned {fg_red}%s {fg_blue}heater!', switch_action)
else:
cli.log.error('{fg_red}Did not turn {fg_blue}%s {fg_red}heater! TOPIC_HEATER_SWITCH=%s', switch_action, mqtt_messages.get(environ['TOPIC_HEATER_SWITCH']))
success = False
if environ['TOPIC_HEATER_SWITCH'] in mqtt_messages:
del mqtt_messages[environ['TOPIC_HEATER_SWITCH']]
return success
def sweep_temp_readings(cli, target_temp):
"""Walk the temperature up, down, and up again.
"""
target_temp = Decimal(target_temp)
temp_variance = Decimal(environ['TEMP_VARIANCE'])
lower_target_temp = target_temp - temp_variance
success = True
up_lowest_temp = lower_target_temp - Decimal('0.2')
up_highest_temp = target_temp + Decimal('0.3')
up_range = (int(up_lowest_temp*10), int(up_highest_temp*10))
up_range_list = [i/10 for i in range(*up_range)]
up_on = [up_lowest_temp+Decimal('0.1'), lower_target_temp]
up_off = [up_highest_temp-Decimal('0.1')]
down_lowest_temp = lower_target_temp - Decimal('0.3')
down_highest_temp = target_temp + Decimal('0.1')
down_range = (int(down_highest_temp*10), int(down_lowest_temp*10), -1)
down_range_list = [i/10 for i in range(*down_range)]
down_on = [down_lowest_temp+Decimal('0.1')]
down_off = [down_highest_temp, down_highest_temp-Decimal('0.1')]
cli.log.info("{fg_green}sweep_temp_readings: {fg_reset}Going up: {fg_blue}%s", ', '.join(map(str, up_range_list)))
if not check_temps([Decimal(i)/10 for i in range(*up_range)], up_on, up_off):
success = False
cli.log.info("{fg_green}sweep_temp_readings: {fg_reset}Going down: {fg_blue}%s", ', '.join(map(str, down_range_list)))
if not check_temps([Decimal(i)/10 for i in range(*down_range)], down_on, down_off):
success = False
return success
@cli.entrypoint('Test hestia')
def main(cli):
success = True
## Start the daemons we'll need
cli.log.info('{fg_magenta}Starting mosquitto in the background and waiting {fg_green}%s {fg_magenta}seconds for initialization', MOSQUITTO_SLEEP_TIME)
args = ['mosquitto', '-c', './test_mosquitto.conf']
processes['mosquitto'] = Popen(args)
sleep(MOSQUITTO_SLEEP_TIME)
cli.log.info('{fg_magenta}Starting MQTT listener and waiting {fg_green}%s {fg_magenta}seconds for initialization', MOSQUITTO_SLEEP_TIME)
app.loop_start()
sleep(MOSQUITTO_SLEEP_TIME)
cli.log.info('{fg_magenta}Starting hestia in the background and waiting {fg_green}%s {fg_magenta}seconds for initialization', HESTIA_SLEEP_TIME)
args = ['gourd', 'hestia:app']
processes['hestia'] = Popen(args)
sleep(HESTIA_SLEEP_TIME)
## Run tests here
cli.log.info('{fg_magenta}Basic temperature sweep at default desired temp')
if not sweep_temp_readings(cli, environ['TEMP_DESIRED']):
cli.log.error('{fg_red}Basic functionality test failed!')
success = False
cli.log.info('{fg_magenta}Set the target temp to {fg_red}19 {fg_magenta}and rerun the sweep')
app.publish(environ['TOPIC_TARGET_SET'], '19')
sleep(1)
if not sweep_temp_readings(cli, '19'):
cli.log.error('{fg_red}New target temperature test failed!')
success = False
# Report status
return success
if __name__ == '__main__':
if cli():
exit(0)
exit(1)