forked from geezacoleman/OpenWeedLocator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relay_control.py
185 lines (155 loc) · 7.15 KB
/
relay_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from logger import Logger
from threading import Thread, Condition
from utils.cli_vis import NozzleVis
import collections
import time
import os
import platform
# check if the system is being tested on a Windows or Linux x86 64 bit machine
if platform.system() == "Windows":
testing = True
else:
if '64' in platform.machine():
testing = True
else:
from gpiozero import Buzzer, OutputDevice
testing = False
# two test classes to run the analysis on a desktop computer if a "win32" platform is detected
class TestRelay:
def __init__(self, relayNumber, verbose=False):
self.relayNumber = relayNumber
self.verbose = verbose
def on(self):
if self.verbose:
print(f"[TEST] Relay {self.relayNumber} ON")
def off(self):
if self.verbose:
print(f"[TEST] Relay {self.relayNumber} OFF")
class TestBuzzer:
def beep(self, on_time: int, off_time: int, n=1, verbose=False):
for i in range(n):
if verbose:
print('BEEP')
# control class for the relay board
class RelayControl:
def __init__(self, solenoidDict):
self.testing = True if testing else False
self.solenoidDict = solenoidDict
self.on = False
if not self.testing:
self.buzzer = Buzzer(pin='BOARD7')
for nozzle, boardPin in self.solenoidDict.items():
self.solenoidDict[nozzle] = OutputDevice(pin=f'BOARD{boardPin}')
else:
self.buzzer = TestBuzzer()
for nozzle, boardPin in self.solenoidDict.items():
self.solenoidDict[nozzle] = TestRelay(boardPin)
def relay_on(self, solenoidNumber, verbose=True):
relay = self.solenoidDict[solenoidNumber]
relay.on()
if verbose:
print(f"Solenoid {solenoidNumber} ON")
def relay_off(self, solenoidNumber, verbose=True):
relay = self.solenoidDict[solenoidNumber]
relay.off()
if verbose:
print(f"Solenoid {solenoidNumber} OFF")
def beep(self, duration=0.2, repeats=2):
self.buzzer.beep(on_time=duration, off_time=(duration / 2), n=repeats)
def all_on(self):
for nozzle in self.solenoidDict.keys():
self.relay_on(nozzle)
def all_off(self):
for nozzle in self.solenoidDict.keys():
self.relay_off(nozzle)
def remove(self, solenoidNumber):
self.solenoidDict.pop(solenoidNumber, None)
def clear(self):
self.solenoidDict = {}
def stop(self):
self.clear()
self.all_off()
# this class does the hard work of receiving detection 'jobs' and queuing them to be actuated. It only turns a nozzle on
# if the sprayDur has not elapsed or if the nozzle isn't already on.
class Controller:
def __init__(self, nozzleDict, vis=False):
self.nozzleDict = nozzleDict
self.vis = vis
# instantiate relay control with supplied nozzle dictionary to map to correct board pins
self.solenoid = RelayControl(self.nozzleDict)
self.nozzleQueueDict = {}
self.nozzleconditionDict = {}
# start the logger and log file using absolute path of python file
self.saveDir = os.path.join(os.path.dirname(__file__), 'logs')
self.logger = Logger(name="weed_log.txt", saveDir=self.saveDir)
# create a job queue and Condition() for each nozzle
print("[INFO] Setting up nozzles...")
self.nozzle_vis = NozzleVis(relays=len(self.nozzleDict.keys()))
for nozzle in range(0, len(self.nozzleDict)):
self.nozzleQueueDict[nozzle] = collections.deque(maxlen=5)
self.nozzleconditionDict[nozzle] = Condition()
# create the consumer threads, setDaemon and start the threads.
nozzleThread = Thread(target=self.consumer, args=[nozzle])
nozzleThread.setDaemon(True)
nozzleThread.start()
time.sleep(1)
print("[INFO] Nozzle setup complete. Initiating camera...")
self.solenoid.beep(duration=0.5)
def receive(self, nozzle, timeStamp, location=0, delay=0, duration=1):
"""
this method adds a new spray job to specified nozzle queue. GPS location data etc to be added. Time stamped
records the true time of weed detection from main thread, which is compared to time of nozzle activation for accurate
on durations. There will be a minimum on duration of this processing speed ~ 0.3s. Will default to 0 though.
:param nozzle: nozzle number (zero based)
:param timeStamp: this is the time of detection
:param location: GPS functionality to be added here
:param delay: on delay to be added in the future
:param duration: duration of spray
"""
inputQMessage = [nozzle, timeStamp, delay, duration]
inputQ = self.nozzleQueueDict[nozzle]
inputCondition = self.nozzleconditionDict[nozzle]
# notifies the consumer thread when something has been added to the queue
with inputCondition:
inputQ.append(inputQMessage)
inputCondition.notify()
line = f"nozzle: {nozzle} | time: {timeStamp} | location {location} | delay: {delay} | duration: {duration}"
self.logger.log_line(line, verbose=False)
def consumer(self, nozzle):
"""
Takes only one parameter - nozzle, which enables the selection of the deque, condition from the dictionaries.
The consumer method is threaded for each nozzle and will wait until it is notified that a new job has been added
from the receive method. It will then compare the time of detection with time of spraying to activate that nozzle
for requried length of time.
:param nozzle: nozzle vlaue
"""
self.running = True
inputCondition = self.nozzleconditionDict[nozzle]
inputCondition.acquire()
nozzleOn = False
nozzleQueue = self.nozzleQueueDict[nozzle]
while self.running:
while nozzleQueue:
sprayJob = nozzleQueue.popleft()
inputCondition.release()
# check to make sure time is positive
onDur = 0 if (sprayJob[3] - (time.time() - sprayJob[1])) <= 0 else (sprayJob[3] - (time.time() - sprayJob[1]))
if not nozzleOn:
time.sleep(sprayJob[2]) # add in the delay variable
self.solenoid.relay_on(nozzle, verbose=False)
if self.vis:
self.nozzle_vis.update(relay=nozzle, status=True)
nozzleOn = True
try:
time.sleep(onDur)
self.logger.log_line(f'[INFO] onDur {onDur} for nozzle {nozzle} received.')
except ValueError:
time.sleep(0)
self.logger.log_line(f'[ERROR] negative onDur {onDur} for nozzle {nozzle} received. Turning on for 0 seconds.')
inputCondition.acquire()
if len(nozzleQueue) == 0:
self.solenoid.relay_off(nozzle, verbose=False)
if self.vis:
self.nozzle_vis.update(relay=nozzle, status=False)
nozzleOn = False
inputCondition.wait()