-
Notifications
You must be signed in to change notification settings - Fork 0
/
presets.py
executable file
·326 lines (278 loc) · 13.8 KB
/
presets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/python3
import argparse
import signal
import os
import traceback
import re
import subprocess
import time
import threading
import json
from visca_over_ip.camera import *
import sms # sms.py local file
import global_file as gf
NUM_RETRIES = 5
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self,signum, frame):
print('\n!!Received SIGINT or SIGTERM!!')
self.kill_now = True
def signal_handler(sig, frame):
print('\n!!Exiting from Ctrl+C or SIGTERM!!')
sys.exit(0)
def set_preset(ward, cam_ip, preset_file, preset, num_from = None, num_to = None, verbose = False):
try:
if(os.path.exists(preset_file)):
with open(preset_file, "r") as presetFile:
presets = json.load(presetFile)
except:
if(verbose): print(traceback.format_exc())
print("Error reading preset file")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had an error reading the preset file in set method!", verbose)
return
try:
ptz_cam = Camera(cam_ip)
except:
if(verbose): print(traceback.format_exc())
print("Failure connecting to VISCA Camera")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had a failure connecting to the VISCA port on the camera in set method!", verbose)
return
for preset_name in presets:
if(presets[preset_name]['preset'] == preset):
print(preset_name)
try:
ptz_cam.pantilt(24, 24, presets[preset_name]['pan'], presets[preset_name]['tilt'])
ptz_cam.zoom_to(presets[preset_name]['zoom'])
except:
if(verbose): print(traceback.format_exc())
print("Failure setting camera PTZ position")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had a failure setting camera PTZ position in set method!", verbose)
last_pan = None
last_tilt = None
last_zoom = None
coordinate_found = None
while(coordinate_found is None and not gf.killer.kill_now):
try:
pan, tilt = ptz_cam.get_pantilt_position()
zoom = ptz_cam.get_zoom_position()
if(pan != last_pan or tilt != last_tilt or zoom != last_zoom):
print("Camera Moving")
else:
#need to put stuff here to report or save PTZ
coordinate_found = True
last_pan = pan
last_tilt = tilt
last_zoom = zoom
time.sleep(1)
except:
if(verbose): print(traceback.format_exc())
gf.log_exception(traceback.format_exc(), "failure getting camera PTZ")
print("Failure getting camera PTZ position")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had a failure getting camera PTZ position in set method!", verbose)
time.sleep(10)
def record_presets(ward, cam_ip, preset_file, num_from = None, num_to = None, verbose = False):
try:
if(os.path.exists(preset_file)):
with open(preset_file, "r") as presetFile:
presets = json.load(presetFile)
except:
if(verbose): print(traceback.format_exc())
print("Error reading preset file")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had an error reading the preset file!", verbose)
return
try:
ptz_cam = Camera(cam_ip)
except:
if(verbose): print(traceback.format_exc())
gf.log_exception(traceback.format_exc(), "failure connecting to the VISCA port on the camera")
print("Failure connecting to VISCA Camera")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had a failure connecting to the VISCA port on the camera!", verbose)
return
last_preset = None
for preset_name in presets:
print("Finding coordinates for preset " + preset_name)
# run through each preset and find it's coordinates
subprocess.run(["curl", "http://" + cam_ip + "/cgi-bin/ptzctrl.cgi?ptzcmd&poscall&" + str(presets[preset_name]['preset'])], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
last_pan = None
last_tilt = None
last_zoom = None
coordinate_found = None
while(coordinate_found is None and not gf.killer.kill_now):
try:
pan, tilt = ptz_cam.get_pantilt_position()
zoom = ptz_cam.get_zoom_position()
if(pan != last_pan or tilt != last_tilt or zoom != last_zoom):
preset = 0
if(preset != last_preset):
print("Camera Moving")
else:
preset = presets[preset_name]['preset']
presets[preset_name]['pan'] = pan
presets[preset_name]['tilt'] = tilt
presets[preset_name]['zoom'] = zoom
coordinate_found = True
last_pan = pan
last_tilt = tilt
last_zoom = zoom
time.sleep(1)
except:
if(verbose): print(traceback.format_exc())
print("Failure getting preset coordinates")
try:
if(os.path.exists(preset_file)):
with open(preset_file, "w") as presetFile:
json.dump(presets, presetFile, indent=4, default=str)
except:
if(verbose): print(traceback.format_exc())
print("Error writing preset file")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had an error writing the preset file!", verbose)
return
def report_preset(delay, ward, cam_ip, preset_file, preset_status_file, num_from = None, num_to = None, verbose = False):
last_preset = None
last_pan = None
last_tilt = None
last_zoom = None
time.sleep(delay)
try:
if(os.path.exists(preset_file)):
with open(preset_file, "r") as presetFile:
presets = json.load(presetFile)
except:
if(verbose): print(traceback.format_exc())
print("Error reading preset file")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had an error reading the preset file!", verbose)
return
exception = None
for retry_num in range(NUM_RETRIES):
exception = None
tb = None
try:
ptz_cam = Camera(cam_ip)
break
except Exception as exc:
exception = exc
tb = traceback.format_exc()
if(verbose): print('!!VISCA Connection Retry!!')
gf.sleep(0.5,1)
if exception:
if(verbose): print(tb)
print("Failure connecting to VISCA Camera")
if(num_from is not None and num_to is not None):
sms.send_sms(num_from, num_to, ward + " had a failure connecting to the VISCA port on the camera!", verbose)
return
while(not gf.killer.kill_now):
try:
pan, tilt = ptz_cam.get_pantilt_position()
zoom = ptz_cam.get_zoom_position()
if(pan != last_pan or tilt != last_tilt or zoom != last_zoom):
preset = 0
if(preset != last_preset):
print("Camera Moving")
if(preset_status_file is not None):
with open(preset_status_file, "w") as presetstatusFile:
presetstatusFile.write('0\n')
else:
# we only want to check for this after the camera has finished
# moving, so check for preset = 0 to determine the last pass
# through this was the camera moving
if(preset == 0):
# camera has stopped moving, so if we're attempting to
# terminate the local stream, send that signal now.
if(gf.stream_event.is_set()):
gf.stream_event_terminate.set()
preset = None
for preset_name in presets:
if(pan == presets[preset_name]['pan'] and tilt == presets[preset_name]['tilt'] and zoom == presets[preset_name]['zoom']):
preset = presets[preset_name]['preset']
if(preset != last_preset):
print(preset_name)
if(preset != last_preset and preset is None):
print("Undefined")
preset = -1
if(preset_status_file is not None):
with open(preset_status_file, "w") as presetstatusFile:
presetstatusFile.write(str(preset)+'\n')
last_preset = preset
last_pan = pan
last_tilt = tilt
last_zoom = zoom
gf.consecutive_ptz_status_failures = 0
time.sleep(1)
except:
if(verbose): print(traceback.format_exc())
print("Failure getting camera PTZ position")
gf.consecutive_ptz_status_failures += 1
# camera PTZ position failures are not a huge isssue
# the random wait on retry didn't seem to fix the issue
# so to prevent constant text messages only send message
# after several consecutive failures
if(gf.consecutive_ptz_status_failures >= gf.pts_status_retries):
# if we've reached this state it's likely the next request
# will also fail, so zero the counter to prevent double
# messages
gf.consecutive_ptz_status_failures = 0
if(gf.ptz_sms_sent <= gf.ptz_sms_max):
gf.ptz_sms_sent += 1
# Have yet to see this failure notification be anything more
# than just camera unresponsiveness or network packets
# getting dropped, all this SMS notification does is fill
# up the notification system and make it harder to see
# real problems
#if(num_from is not None and num_to is not None):
# sms.send_sms(num_from, num_to, ward + " had a failure getting camera PTZ position!", verbose)
time.sleep(1)
ptz_cam.close_connection()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Report out camera current preset by getting camera positions and verifying against preset locations')
parser.add_argument('-c','--config-file',type=str,help='JSON Configuration file')
parser.add_argument('-w','--ward',type=str,help='Name of Ward being broadcast')
parser.add_argument('-p','--pc-name',type=str,help='System name that script is running on.')
parser.add_argument('--preset-file',type=str,help='JSON file where camera presets are stored.')
parser.add_argument('--record-presets',default=False,action='store_true',help='Updates preset positions in preset-file')
parser.add_argument('--set-preset',type=int,help='Set camera preset to value from preset-file')
parser.add_argument('-R','--rtsp-stream',type=str,help='Use to specify an RTSP stream on the network to use instead of USB camera')
parser.add_argument('-F','--num-from',type=str,help='SMS notification from number - Twilio account number')
parser.add_argument('-T','--num-to',type=str,help='SMS number to send notification to')
parser.add_argument('-v','--verbose',default=False,action='store_true',help='Increases vebosity of error messages')
args = parser.parse_args()
gf.killer = GracefulKiller()
if(args.config_file is not None and os.path.exists(args.config_file)):
with open(args.config_file, "r") as configFile:
config = json.load(configFile)
# check for keys in config file
if 'broadcast_ward' in config:
args.ward = config['broadcast_ward']
if 'preset_file' in config:
args.preset_file = config['preset_file']
if 'source_rtsp_stream' in config:
args.rtsp_stream = config['source_rtsp_stream']
if 'preset_status_file' in config:
preset_status_file = config['preset_status_file']
if 'notification_text_from' in config:
args.num_from = config['notification_text_from']
if 'notification_text_to' in config:
args.num_to = config['notification_text_to']
if(args.rtsp_stream is not None):
ip_pattern = re.compile('''((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)''')
camera_ip = ip_pattern.search(args.rtsp_stream).group(0)
if(camera_ip is None and (args.config_file is None or (args.pc_name is None and args.preset_file is None and args.rtsp_stream is None))):
print("A valid configuration file and rtsp stream are required to monitor presets.")
if(args.record_presets):
record_presets(args.ward if args.pc_name is None else args.pc_name, camera_ip, args.preset_file, args.num_from, args.num_to, args.verbose)
exit()
if(args.set_preset is not None):
set_preset(args.ward if args.pc_name is None else args.pc_name, camera_ip, args.preset_file, args.set_preset, args.num_from, args.num_to, args.verbose)
exit()
if(camera_ip is not None and (args.ward is not None or args.pc_name is not None) and args.preset_file is not None and preset_status_file is not None):
report_preset(0, args.ward if args.pc_name is None else args.pc_name, camera_ip, args.preset_file, preset_status_file, args.num_from, args.num_to, args.verbose)