-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera_processor.py
80 lines (60 loc) · 2.33 KB
/
camera_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import cv2
import torch
import numpy as np
import cv2
import argparse
from time import time, sleep
import os
import sys
from ultralytics import YOLO
import redis
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;udp'
class CameraProcessor(object):
def __init__(self, rtsp_stream, output_video_file_path):
self.rtsp_stream = rtsp_stream
self.output_video_file_path = output_video_file_path
self.stream = cv2.VideoCapture(self.rtsp_stream, cv2.CAP_FFMPEG)
if not self.stream.isOpened():
print('Could not open RTSP stream {url}'.format(url=self.rtsp_stream))
sys.exit(-1)
self.model = YOLO()
self.classes = self.model.names
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
if self.device == 'cpu':
print('Warning CUDA is not available')
self.redis = redis.Redis(host='localhost', port=6379)
self.frame_delay = 0
def process_video_stream(self):
assert self.stream.isOpened()
while True:
start_time = time()
ret, frame = self.stream.read()
assert ret
results = self.model.predict(frame)
prediction_list_name = 'predictions'
for i in range(0, self.redis.llen(prediction_list_name)):
self.redis.lpop(prediction_list_name)
for result in results:
for cls in result.boxes.cls:
print(self.model.names[int(cls)])
self.redis.lpush('predictions', self.model.names[int(cls)])
frame = results[0].visualize()
end_time = time()
fps = 1/np.round(end_time - start_time, 3)
cv2.imshow("DBot camera", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
sleep(self.frame_delay)
self.stream.release()
cv2.destroyAllWindows()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--camera_rtsp_url')
parser.add_argument('--output_video_file_path')
args = parser.parse_args()
camera_rtsp_url = args.camera_rtsp_url.strip()
output_video_file_path = args.output_video_file_path
camera_process = CameraProcessor(camera_rtsp_url, output_video_file_path)
camera_process.process_video_stream()
if __name__ == '__main__':
main()