-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssdp.py
171 lines (136 loc) · 5.13 KB
/
ssdp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# -*- coding: utf-8 -*-
import socket
import struct
import asyncio
import logging
import errno
logger = logging.getLogger('ssdp')
MULTICAST_ADDRESS = '239.255.255.250'
MULTICAST_PORT = 1900
class SSDPDevice():
def __init__(self, usn, location):
self.usn = usn
self.location = location
self.subdevices = []
def uuid(self):
try:
return self.usn.split(':')[1]
except IndexError:
return self.usn
def target(self):
try:
return self.usn.split('::')[1]
except IndexError:
return self.usn
def matches_target(self, search_target):
return search_target == 'ssdp:all' or search_target == self.target()
class SimpleServiceDiscoveryProtocol(asyncio.DatagramProtocol):
def __init__(self, device_callback=None, filter=None):
self.device_callback = device_callback or (lambda _: None)
self.local_devices = []
self.filter = filter # filtering off
self.handlers = {
'NOTIFY * HTTP/1.1': self.handle_notify,
'M-SEARCH * HTTP/1.1': self.handle_search,
'HTTP/1.1 200 OK': self.handle_search_response,
}
def announce_device(self, device):
self.local_devices.append(device)
self.send_notify(device, notify_type='ssdp:alive')
def remove_device(self, device):
self.local_devices.remove(device)
self.send_notify(device, notify_type='ssdp:byebye')
def search_devices(self):
if self.filter is None:
self.send_search()
else:
self.send_search(search_target=self.filter)
def connection_made(self, transport):
self.transport = transport
def send(self, data, addr):
logger.debug("%s:%s < \"%s\"", *(addr + (data,)))
self.transport.sendto(data.encode('utf-8'), addr)
def send_notify(self, device, notify_type='ssdp:alive'):
data = (
"NOTIFY * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"CACHE-CONTROL: max-age=3600\r\n"
"LOCATION: {loc}\r\n"
"NT: {nt}\r\n"
"NTS: {nts}\r\n"
"SERVER: 'Linux UPnP/1.0 upnpy/0.1'\r\n"
"USN: {usn}\r\n"
"\r\n"
).format(loc=device.location, nt=device.target(), nts=notify_type,
usn=device.usn)
addr = (MULTICAST_ADDRESS, MULTICAST_PORT)
self.send(data, addr)
def send_search(self, search_target='ssdp:all', max_delay=2):
data = (
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
'MAN: "ssdp:discover"\r\n'
"ST: {st}\r\n"
"MX: {mx}\r\n"
"\r\n"
).format(st=search_target, mx=max_delay)
addr = (MULTICAST_ADDRESS, MULTICAST_PORT)
self.send(data, addr)
def send_search_response(self, device, addr, search_target='ssdp:all'):
data = (
"HTTP/1.1 200 OK\r\n"
"CACHE-CONTROL: max-age=3600\r\n"
"LOCATION: {loc}\r\n"
"SERVER: 'Linux UPnP/1.0 upnpy/0.1'\r\n"
"ST: {st}\r\n"
"USN: {usn}\r\n"
"\r\n"
).format(loc=device.location, st=search_target, usn=device.usn)
self.send(data, addr)
def datagram_received(self, data, addr):
data = data.decode()
logger.debug("%s:%s > \"%s\"", *(addr + (data,)))
data = data.splitlines()
if len(data) < 1 or data[0] not in self.handlers.keys():
return
headers = {
p[0].strip().lower(): p[1].strip()
for p in (
line.split(':', 1) for line in data[1:]
if line and ':' in line
)
}
method = data[0]
self.handlers[method](headers, addr)
def handle_notify(self, data, addr):
logger.debug("NOTIFY")
nts = data.get('nts')
usn = data.get('usn')
root_desc = data.get('location')
device = SSDPDevice(usn, root_desc)
if not self.filter or device.matches_target(self.filter):
if nts == 'ssdp:alive':
self.device_callback(device)
else:
# TODO implement ssdp:byebye
logger.info("Notify byebye %s", device.usn)
def handle_search(self, data, addr):
logger.debug("SEARCH")
for device in self.local_devices:
if 'st' not in data or not self.filter:
self.send_search_response(device, addr)
elif device.matches_target(data['st']):
self.send_search_response(
device, addr, search_target=data['st'])
def handle_search_response(self, data, addr):
logger.debug("SEARCH RESPONSE")
usn = data.get('usn')
root_desc = data.get('location')
device = SSDPDevice(usn, root_desc)
if not self.filter or device.matches_target(self.filter):
self.device_callback(device)
def error_received(self, exc):
if exc == errno.EAGAIN or exc == errno.EWOULDBLOCK:
logger.error('Error received: %s', exc)
else:
raise IOError("Unexpected connection error") from exc