This repository has been archived by the owner on Apr 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
hub.py
185 lines (131 loc) · 5.68 KB
/
hub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import time
import sqlite3
import serial
import math
import json
# GPIO imports
import RPi.GPIO as GPIO
from node.Adafruit_BME280 import *
from node import camera
# DB import
from db.sql_db_helper import DBHelper as SqlHelper
IMAGE_FOLDER_ROOT_DIR = './images/sensor/'
class Hub:
def __init__(self, hub_name, connect_radio) -> None:
print("********Initiating hub:{}...********".format(hub_name))
try:
self.hub_name = hub_name
# GPIO
GPIO.setmode(GPIO.BOARD)
# LED
self.ledRedPin = 40
self.__toggle_fog_led(on=True)
self.__toggle_fog_led(on=False)
# connect to BME
self.bme = BME280(t_mode=BME280_OSAMPLE_8,
p_mode=BME280_OSAMPLE_8,
h_mode=BME280_OSAMPLE_8)
# connect to radio
self.connected_radio = connect_radio
if connect_radio:
self.connected_node_devices = []
self.ser = serial.Serial(port='/dev/ttyACM0',
baudrate=115200,
timeout=1)
# FIXME: change here. tentatively still 2
while len(self.connected_node_devices) != 2:
print("resetting radio...")
self.radio_reset()
time.sleep(5)
print("handshaking...")
self.radio_handshake()
time.sleep(5)
# connect to db
self.sql_db = SqlHelper(db_name='fog.db')
# alarm
self.alarm_activated = False
except Exception as err:
raise Exception("********INIT FAILED: {}********".format(err))
############################ START OF FOG COMMANDS ############################
def monitor_baby_readings(self):
print("Getting reading from node devices...")
combined_sensor_data = {}
if self.connected_radio:
combined_sensor_data = self.__get_data_from_node('sensor=all')
temp = int(self.bme.read_temperature())
combined_sensor_data['temperature'] = '{0:0.1f}'.format(temp) # 1 d.p
combined_sensor_data['devicename'] = self.hub_name
# get data from camera, Blob format (blob is just a large binary object)
combined_sensor_data['image'] = camera.captureImage(IMAGE_FOLDER_ROOT_DIR)
# mock right distance
combined_sensor_data["rightdistance"] = 50
print("Combined_sensor_data sensor reading: {}".format(
combined_sensor_data))
self.sql_db.insert_sensor_data(combined_sensor_data)
# for testing
# test_result = self.sql_db.select("SELECT name FROM sqlite_master WHERE type='table';")
# print(test_result)
return combined_sensor_data
############################ START OF NODE COMMANDS ############################
def radio_reset(self):
self.__send_command('reset')
def radio_handshake(self):
print("Connecting fog to micro:bit devices...")
self.__send_command('handshake')
strMicrobitDevices = None
while strMicrobitDevices == None or len(strMicrobitDevices) <= 0:
strMicrobitDevices = self.__wait_response()
time.sleep(0.1)
print(strMicrobitDevices)
# strMicrobitDevices format is enrol=deviceNameA,deviceNameB...
strMicrobitDevices = strMicrobitDevices.split('=')
# get list of devices
if len(strMicrobitDevices[1]) > 0:
# get individual device name
self.connected_node_devices = strMicrobitDevices[1].split(',')
if len(self.connected_node_devices) > 0:
for mb in self.connected_node_devices:
print('Connected to micro:bit device {}...'.format(mb))
def activate_alarm(self):
self.alarm_activated = True
self.__toggle_fog_led(True)
self.__send_command("cmd:alarm=on")
# TODO: add command to on node alarm
def deactivate_alarm(self):
self.alarm_activated = False
self.__toggle_fog_led(False)
self.__send_command("cmd:alarm=off")
# TODO: add command to off node alarm
def __get_data_from_node(self, commandToTx: str) -> list:
print("Getting {}...".format(commandToTx))
self.__send_command('cmd:' + commandToTx)
strSensorValues = None
while strSensorValues == None or len(strSensorValues) <= 0:
strSensorValues = self.__wait_response()
time.sleep(0.1)
sensorValuesDict = {}
listSensorValues = strSensorValues.split(',')
for sensorValue in listSensorValues:
keyValue = sensorValue.split("=")
key = keyValue[0]
value = keyValue[1]
sensorValuesDict[key] = value
print(listSensorValues)
# did not get data from all node devices, retry
if len(listSensorValues) < len(self.connected_node_devices):
print("did not receive feedback from all nodes, retrying...")
listSensorValues = self.__get_data_from_node(commandToTx)
# return listSensorValues
return sensorValuesDict
def __toggle_fog_led(self, on):
GPIO.setup(self.ledRedPin, GPIO.OUT)
GPIO.output(self.ledRedPin, not on)
def __send_command(self, command):
command = command + '\n'
self.ser.write(str.encode(command))
def __wait_response(self) -> str:
response = self.ser.readline()
response = response.decode('utf-8').strip()
return response
def __del__(self):
GPIO.cleanup()