-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathartnet-receiver.py
132 lines (91 loc) · 3.94 KB
/
artnet-receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sys
import time
from socket import (socket, AF_INET, SOCK_DGRAM)
from struct import unpack
import optparse
import blinkytape
# Default Blinky Tape port on Raspberry Pi is /dev/ttyACM0
parser = optparse.OptionParser()
parser.add_option("-p", "--port", dest="portname",
help="serial port (ex: /dev/ttyacm0)", default="/dev/ttyacm0")
parser.add_option("-l", "--length", dest="length",
help="number of LEDs attached to the BlinkyTape controller", type=int, default=64)
(options, args) = parser.parse_args()
UDP_IP = "" # listen on all sockets- INADDR_ANY
UDP_PORT = 0x1936 # Art-net is supposed to only use this address
PIXELS_PER_UNIVERSE = 170 # Number of pixels to expect on a universe
BYTES_PER_PIXEL = 3
BLINKYTAPE_DEVICE = options.portname
BLINKYTAPE_LENGTH = options.length
class ArtnetPacket:
ARTNET_HEADER = b'Art-Net\x00'
OP_OUTPUT = 0x0050
def __init__(self):
self.op_code = None
self.ver = None
self.sequence = None
self.physical = None
self.universe = None
self.length = None
self.data = None
@staticmethod
def unpack_raw_artnet_packet(raw_data):
if unpack('!8s', raw_data[:8])[0] != ArtnetPacket.ARTNET_HEADER:
return None
packet = ArtnetPacket()
# We can only handle data packets
(packet.op_code,) = unpack('!H', raw_data[8:10])
if packet.op_code != ArtnetPacket.OP_OUTPUT:
return None
(packet.op_code, packet.ver, packet.sequence, packet.physical,
packet.universe, packet.length) = unpack('!HHBBHH', raw_data[8:18])
(packet.universe,) = unpack('<H', raw_data[14:16])
(packet.data,) = unpack(
'{0}s'.format(int(packet.length)),
raw_data[18:18+int(packet.length)])
return packet
def blinkytape_artnet_receiver():
print(("Listening in {0}:{1}").format(UDP_IP, UDP_PORT))
sock = socket(AF_INET, SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
bt = blinkytape.BlinkyTape(BLINKYTAPE_DEVICE, BLINKYTAPE_LENGTH)
lastSequence = 0
packetCount = 0
lastTime = time.time()
datas = []
while True:
try:
data, addr = sock.recvfrom(1024)
packet = ArtnetPacket.unpack_raw_artnet_packet(data)
if packet != None:
# print("Sequence=%i universe=%i"%(packet.sequence,packet.universe))
packetCount += 1
while len(datas) < packet.universe + 1:
print("adding new universe %i" % (packet.universe))
datas.append('')
datas[packet.universe] = bytearray(packet.data)
# Send an update to the tape when a new sequence is received on the last universe
# and lastSequence != packet.sequence: some artnet doesn't provide sequence updates
if packet.universe == (len(datas)-1):
outputData = bytearray()
for data in datas:
if len(data) > PIXELS_PER_UNIVERSE*BYTES_PER_PIXEL:
data = data[0:PIXELS_PER_UNIVERSE*BYTES_PER_PIXEL]
if len(data) < PIXELS_PER_UNIVERSE*BYTES_PER_PIXEL:
data = data + \
('\x00' * (PIXELS_PER_UNIVERSE *
BYTES_PER_PIXEL - len(data)))
outputData.extend(data)
outputDataStr = ""
for b in outputData:
outputDataStr += chr(b)
bt.sendData(outputDataStr)
lastSequence = packet.sequence
if time.time() > lastTime+1:
print("Packets per second: %i" % (packetCount))
packetCount = 0
lastTime = time.time()
except KeyboardInterrupt:
sock.close()
sys.exit()
blinkytape_artnet_receiver()