-
Notifications
You must be signed in to change notification settings - Fork 0
/
AMG8833AdaptiveUploader.py
198 lines (151 loc) · 6.03 KB
/
AMG8833AdaptiveUploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import time
import busio
import board
import adafruit_amg88xx
import json
import numpy as np
from scipy.interpolate import griddata
from colour import Color
import math
import matplotlib.pyplot as plt
from subprocess import call
import datetime
SAMPLE_RATE = 1 # Hz
SAMPLES_BETWEEN_TIME_CHECK = 10 # check time every 10th loop
class AMG8833AdaptiveUploader:
"""
Reads and saves the data from the IR camera
"""
def __init__(self):
# Init I2C and instantiate sensor on that bus
i2c = busio.I2C(board.SCL, board.SDA)
self.amg = adafruit_amg88xx.AMG88XX(i2c)
# Period at which IR sensor is sampled
self.period = 1 / SAMPLE_RATE
# Track Mode
self.track = False
self.alerted = True
self.temp_img = []
# Time at boot-up for output file
self.current_time = datetime.datetime.now()
current_year = self.current_time.year - 2000 # Dont' care about millenium
current_month = self.current_time.month
current_day = self.current_time.day
current_hour = self.current_time.hour
# Add leading zeroes to help with sorting filenames later
if current_hour < 10:
edh = '0'
else:
edh = ''
if current_day < 10:
edd = '0'
else:
edd = ''
if current_month < 10:
edm = '0'
else:
edm = ''
# Output file
self.filename = 'data_' + str(current_year) + edm + str(current_month) + edd + str(current_day) + edh + \
str(current_hour) + ".txt"
self.file = open('/home/pi/Desktop/Projects/SIOT_Project/output_data/' + self.filename, 'w')
while True:
# Check the new time
self.check_time()
for i in range(SAMPLES_BETWEEN_TIME_CHECK):
self.append_data()
def check_time(self):
"""
Checks the current time and closes the file and calls for upload when hour has changed
"""
# Get new time
new_time = datetime.datetime.now()
# If hour has incremented or is back at 0 -> New day has begun
if new_time.hour > self.current_time.hour or new_time.day > self.current_time.day or new_time.month \
> self.current_time.month or new_time.year > self.current_time.year:
new_year = new_time.year - 2000 # Don't care about the millenium
new_month = new_time.month
new_day = new_time.day
new_hour = new_time.hour
# Add leading zeroes to help with sorting filenames later
if new_hour < 10:
edh = '0'
else:
edh = ''
if new_day < 10:
edd = '0'
else:
edd = ''
if new_month < 10:
edm = '0'
else:
edm = ''
# Close file and upload it
self.file.close()
self.upload_file()
# Create a new file
self.filename = 'data_' + str(new_year) + edm + str(new_month) + edd + str(new_day) + edh + \
str(new_hour) + ".txt"
self.file = open('/home/pi/Desktop/Projects/SIOT_Project/output_data/' + self.filename, 'w')
# Make new_time to current_time
self.current_time = new_time
def upload_file(self):
"""
Uploads the current file via a call to a Dropbox-Uploader script
"""
output_path = "/home/pi/Desktop/Projects/SIOT_Project/output_data/"
upload = "/home/pi/Desktop/Projects/SIOT_Project/Dropbox-Uploader/dropbox_uploader.sh upload " + output_path + \
self.filename + " " + self.filename
call([upload], shell=True)
def append_data(self, debug=True):
"""
Reads raw data from the sensor and writes it to a file
"""
data = []
for row in self.amg.pixels:
# Pad to 1 decimal place
row_data = [float(round(temp, 2)) for temp in row]
data.append(row_data)
# Turn track mode on...
nppixels = np.array(data)
if nppixels.max() >= 14 and nppixels.std() > 1.5:
self.track = True
self.alerted = False
self.temp_img = data
# Most of the times... OR: Turn track mode off again...
else:
# Set track flag to False, empty temp_data and set alerted as True
self.track = False
line_data = [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f'")[:-3], data, self.track]
json.dump(line_data, self.file)
self.file.write("\n")
if debug:
print("Track Mode: ", self.track," Time: ", datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f'")[:-3])
print("\n")
# When Track mode is turned off again... Alert user via WhatsApp
if not self.alerted and not self.track and self.temp_img != []:
if debug:
print("Saving image. Time: ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f'")[:-3])
self.save_images(self.temp_img)
self.alerted = True
self.temp_img = []
time.sleep(self.period)
def save_images(self, temp_data, debug=True):
"""
Alerts the user with a Whatsapp message
"""
if debug:
print("The temp_data array: ")
print(temp_data)
pixels = []
points = [(math.floor(ix / 8), (ix % 8)) for ix in range(0, 64)]
grid_x, grid_y = np.mgrid[0:7:32j, 0:7:32j]
for row in temp_data:
pixels = pixels + row
bicubic = np.rot90(griddata(points, pixels, (grid_x, grid_y), method='cubic'))
if debug:
print("The bicubic array: ")
print(bicubic)
plt.imsave('/home/pi/Desktop/Projects/SIOT_Project/output_images/' + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f'")[:-3], bicubic, format='png')
if __name__ == '__main__':
sensor_data_node = AMG8833AdaptiveUploader()