-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathmain.py
129 lines (100 loc) · 3.38 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import argparse
import asyncio
import json
import logging
import os
import platform
import ssl
import time
import cv2
from aiohttp import web
from aiortc import (
MediaStreamTrack,
RTCDataChannel,
RTCPeerConnection,
RTCSessionDescription,
VideoStreamTrack,
)
from aiortc.contrib.media import MediaPlayer, MediaRelay
from av import VideoFrame
ROOT = os.path.dirname(__file__)
relay = None
webcam = None
async def index(request):
content = open(os.path.join(ROOT, "index.html"), "r").read()
return web.Response(content_type="text/html", text=content)
async def javascript(request):
content = open(os.path.join(ROOT, "client.js"), "r").read()
return web.Response(content_type="application/javascript", text=content)
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
await server(pc, offer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
pcs = set()
async def server(pc, offer):
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
@pc.on("track")
def on_track(track):
print("======= received track: ", track)
if track.kind == "video":
t = FaceSwapper(track)
pc.addTrack(t)
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
class FaceSwapper(VideoStreamTrack):
kind = "video"
def __init__(self, track):
super().__init__()
self.track = track
self.face_detector = cv2.CascadeClassifier("./haarcascade_frontalface_alt.xml")
self.face = cv2.imread("./wu.png")
async def recv(self):
timestamp, video_timestamp_base = await self.next_timestamp()
frame = await self.track.recv()
frame = frame.to_ndarray(format="bgr24")
s = time.time()
face_zones = self.face_detector.detectMultiScale(
cv2.cvtColor(frame, code=cv2.COLOR_BGR2GRAY)
)
for x, y, w, h in face_zones:
face = cv2.resize(self.face, dsize=(w, h))
frame[y : y + h, x : x + w] = face
frame = VideoFrame.from_ndarray(frame, format="bgr24")
frame.pts = timestamp
frame.time_base = video_timestamp_base
return frame
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC webcam demo")
parser.add_argument(
"--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_get("/client.js", javascript)
app.router.add_post("/offer", offer)
web.run_app(app, host=args.host, port=args.port)