forked from PeterL1n/BackgroundMattingV2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
93 lines (73 loc) · 2.73 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import cv2
import argparse
from threading import Thread, Lock
from vidgear.gears import NetGear
parser = argparse.ArgumentParser(description='Inference from web-cam')
parser.add_argument('--ip', type=str, default="127.0.0.1")
parser.add_argument('--resolution', type=int, nargs=2, metavar=('width', 'height'), default=(1280, 720))
parser.add_argument('--camera-device', type=int, default=0)
parser.add_argument('--crop-width', type=float, default=1.0)
args = parser.parse_args()
device = args.camera_device
class Camera:
def __init__(self, device_id=0, width=1280, height=720):
self.capture = cv2.VideoCapture(device_id)
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
# self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2)
self.success_reading, self.frame = self.capture.read()
self.read_lock = Lock()
self.thread = Thread(target=self.__update, args=())
self.thread.daemon = True
self.thread.start()
def __update(self):
while self.success_reading:
grabbed, frame = self.capture.read()
with self.read_lock:
self.success_reading = grabbed
self.frame = frame
def read(self):
with self.read_lock:
frame = self.frame.copy()
return frame
def __exit__(self, exec_type, exc_value, traceback):
self.capture.release()
width, height = args.resolution
cam = Camera(device_id=args.camera_device, width=width, height=height)
crop_width = args.crop_width
middle = int(cam.width // 2)
crop_start = middle - int((cam.width * crop_width)//2)
crop_end = middle + int((cam.width * crop_width)//2)
# define tweak flags
#options = {"flag": 0, "copy": False, "track": False, "bidirectional_mode"=True}
options = {"bidirectional_mode" : True}
client = NetGear(
address=args.ip,
port="5454",
protocol="tcp",
pattern=0,
logging=True,
**options
)
# loop over until KeyBoard Interrupted
key = "a"
while True:
try:
# read frames from stream
frame = cam.read()
# send frame to compute_node
processed_frame = client.send(frame[:,crop_start:crop_end,:], message=key)
if not (processed_frame is None):
cv2.imshow("Output Frame", processed_frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
client.send(frame[:,crop_start:crop_end,:], message=key)
break
except KeyboardInterrupt:
break
# close output window
cv2.destroyAllWindows()
exit()