-
Notifications
You must be signed in to change notification settings - Fork 4
/
sort.py
150 lines (130 loc) · 5.06 KB
/
sort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3
import os
import cv2
import yaml
import numpy as np
from tensorflow.keras.models import load_model
from libs.serial_comms import connect_serial, send_data
from libs.image import prepare_image, get_foreground
from libs.actions import get_most_frequent, sort_item
from libs.camera import warmup_camera, detect_motion
from libs.information import Information
try:
# Configuration
with open(os.path.join("CNN", "config.yml"), 'r') as ymlfile:
params = yaml.load(ymlfile)
source_dir = params["source_dir"]
save_dir = params["save_dir"]
save_dir = os.path.join("CNN", save_dir)
model_path = os.path.join(save_dir, "Fold0-0.9948.hdf5")
image_size = (256, 256)
stabilization_iterations = 10
prediction_iterations = 3
# Get labels
train_dir = os.path.join("CNN", source_dir)
labels = os.listdir(train_dir)
labels.sort()
# Connect to base
print("Waiting for base to connect...", end=" ")
bt = connect_serial('/dev/rfcomm0', 19200)
print("Done")
# Load trained model
model = load_model(model_path)
model.summary()
print("Model loaded")
# Input video stream
vid = cv2.VideoCapture(0)
if not vid.isOpened():
raise IOError("Couldn't open webcam or video")
print("Waiting for camera to warm up...", end=" ")
warmup_camera(vid, 2)
print("Done")
# Motion detection background subtractor
motion_subtractor = cv2.createBackgroundSubtractorMOG2(history=10,
varThreshold=300)
# First frame for background
_, frame = vid.read()
background = cv2.resize(frame, image_size, interpolation=cv2.INTER_AREA)
# Initial state
s = {
"waiting_object": 0,
"waiting_base": 1
}
state = s["waiting_object"]
motion_list = [False] * stabilization_iterations
image_stable = False
moved_prev = False
# Stop the motion detector from triggering at beginning
motion_mask = motion_subtractor.apply(background)
# Create information screen
information = Information()
while True:
# Get foreground image of object for the prediction
return_value, frame = vid.read()
image = cv2.resize(frame, image_size, interpolation=cv2.INTER_AREA)
motion_mask = motion_subtractor.apply(image)
foreground_image = get_foreground(image, background)
# Show stream
cv2.imshow("Camera Feed", image)
cv2.moveWindow("Camera Feed", 0, 0)
cv2.imshow("Movement", motion_mask)
cv2.moveWindow("Movement", 0, 380)
cv2.imshow("Foreground", foreground_image)
cv2.moveWindow("Foreground", 400, 0)
cv2.imshow("Information", information.image)
cv2.moveWindow("Information", 400, 380)
# Check if object has moved in the last few frames
motion_detected = detect_motion(motion_mask, image_size)
motion_list.pop(0)
motion_list.append(motion_detected)
if True in motion_list:
moved_prev = True
image_stable = False
else:
image_stable = True
# Check for new object and predict
if state == s["waiting_object"]:
# Check for trigger falling edge (wait for image to stabilize)
if image_stable and moved_prev:
preprocessed_image = prepare_image(foreground_image)
predictions = []
# Predict image class
for i in range(prediction_iterations):
preds = model.predict(preprocessed_image)
preds = np.argmax(preds[0], axis=0)
predictions.append(preds)
prediction = get_most_frequent(predictions)
prediction = labels[prediction]
sorted_class = sort_item(prediction)
if sorted_class is not None:
# Go to corresponding bin
send_data(bt, sorted_class)
information.update(prediction)
state = s["waiting_base"]
else:
print(f"No bin specified for class {prediction}")
# Wait for base to report readiness
elif state == s["waiting_base"]:
bt.reset_input_buffer()
in_data = bt.readline().decode()
if len(in_data) > 0:
# print(repr(in_data))
if in_data[:1] == "c":
bt.write("d".encode())
bt.flush()
state = s["waiting_object"]
information.update()
# Refresh background image
_, frame = vid.read()
background = cv2.resize(frame, image_size,
interpolation=cv2.INTER_AREA)
# Reset motion data
motion_list = [False] * stabilization_iterations
moved_prev = False
# Quit
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
vid.release()
cv2.destroyAllWindows()
bt.close()