-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_temp_module.py
106 lines (89 loc) · 2.87 KB
/
test_temp_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from unittest.mock import Mock
import pytest
from shedpi_components.temperature_probe import (
TempProbe,
)
from shedpi_hub_dashboard.models import DeviceModuleReading
from shedpi_hub_dashboard.tests.utils.factories import (
DeviceModuleFactory,
)
from standalone_modules.shed_pi_module_utils.data_submission import (
ReadingSubmissionService,
)
from standalone_modules.temperature_module.device_protocol import DeviceProtocol
def test_temp_probe_reading_happy_path(temp_probe_path):
# FIXME: Get the actual readout from the modules
probe = TempProbe(submission_service=Mock())
probe.read_temp_raw = Mock(
return_value=[
"YES",
"t=12345",
]
)
temp = probe.read_temp()
assert temp == 12.345
def test_temp_probe_reading_invalid_reading(temp_probe_path):
"""
TODO:
- Find what a real invalid reading looks like
"""
# FIXME: Get the actual readout from the modules
probe = TempProbe(submission_service=Mock())
probe.read_temp_raw = Mock(
return_value=[
"YES",
"t=xxxxxx",
]
)
with pytest.raises(ValueError):
probe.read_temp()
def test_temp_probe_reading_invalid_reading_missing_expected_params(temp_probe_path):
"""
YES is missing from the data feed
"""
# FIXME: Get the actual readout from the modules
try_1 = [
"SOMETHING",
"t=1000",
]
try_2 = [
"YES",
"t=2000",
]
probe = TempProbe(submission_service=Mock())
probe.read_temp_raw = Mock(side_effect=[try_1, try_2])
temp = probe.read_temp()
assert temp == 2.0
# The first reading is discarded as invalid
probe.read_temp_raw.call_count == 2
@pytest.mark.django_db
def test_temp_logger(temp_probe_path, live_server):
# Submission service
submission_service = ReadingSubmissionService()
submission_service.base_url = live_server.url
# Device Protocol
device_protocol = DeviceProtocol(submission_service=submission_service)
# Override the loop timer for the test to end instantly
device_protocol.submission_delay = 0
device_protocol.stop = Mock(side_effect=[False, True])
# Temp probe
schema = {
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Reading",
"type": "object",
"properties": {
"temperature": {"type": "string", "description": "The Temperature"},
},
}
temp_probe = DeviceModuleFactory(schema=schema)
device_protocol.temp_probe.device_id = temp_probe.id
device_protocol.temp_probe.read_temp_raw = Mock(
return_value=[
"YES",
"t=12345",
]
)
device_protocol.run()
# Check that the data was submitted
assert DeviceModuleReading.objects.filter(device_module=temp_probe).count() == 1