-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransmitter.go
163 lines (152 loc) · 4.92 KB
/
transmitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package lcm
import (
"bytes"
"context"
"fmt"
"net"
"golang.org/x/net/ipv4"
"golang.org/x/net/nettest"
"google.golang.org/protobuf/proto"
)
// Transmitter represents an LCM Transmitter instance.
type Transmitter struct {
opts *transmitterOptions
conn *ipv4.PacketConn
sequenceNumber uint32
messageBuf []ipv4.Message
payloadBuf [lengthOfLargestUDPMessage]byte
protoBuf bytes.Buffer
msg Message
}
// Compressor is an interface for an LCM message compressor.
type Compressor interface {
Compress(data []byte) ([]byte, error)
Name() string
}
// DialMulticastUDP returns a Transmitter configured with the provided options.
func DialMulticastUDP(ctx context.Context, transmitterOpts ...TransmitterOption) (*Transmitter, error) {
opts := defaultTransmitterOptions()
for _, transmitterOpt := range transmitterOpts {
transmitterOpt(opts)
}
var listenConfig net.ListenConfig
c, err := listenConfig.ListenPacket(ctx, "udp4", "")
if err != nil {
return nil, fmt.Errorf("dial multicast UDP: %w", err)
}
udpConn := c.(*net.UDPConn)
conn := ipv4.NewPacketConn(udpConn)
if err := conn.SetMulticastTTL(opts.ttl); err != nil {
return nil, fmt.Errorf("dial multicast UDP: %w", err)
}
var ifi *net.Interface
if opts.interfaceName != "" {
ifi, err = net.InterfaceByName(opts.interfaceName)
if err != nil {
return nil, fmt.Errorf("dial multicast UDP: failed to lookup provided if: %w", err)
}
} else {
ifi, err = getMulticastInterface()
if err != nil {
return nil, fmt.Errorf("dial multicast UDP: failed to lookup multicast if: %w", err)
}
}
if err := conn.SetMulticastInterface(ifi); err != nil {
return nil, fmt.Errorf("dial multicast UDP: %w", err)
}
if err := conn.SetMulticastLoopback(opts.loopback); err != nil {
return nil, fmt.Errorf("dial multicast UDP: %w", err)
}
tx := &Transmitter{opts: opts, conn: conn}
if len(opts.addrs) == 0 {
opts.addrs = append(opts.addrs, &net.UDPAddr{IP: DefaultMulticastIP(), Port: DefaultPort})
}
for _, addr := range opts.addrs {
tx.messageBuf = append(tx.messageBuf, ipv4.Message{
Buffers: [][]byte{nil},
Addr: addr,
})
}
return tx, nil
}
// getMulticastInterface retrieves a multicast enabled interface to transmit on.
func getMulticastInterface() (*net.Interface, error) {
ifi, err := nettest.RoutedInterface("ip4", net.FlagUp|net.FlagMulticast|net.FlagLoopback)
if err == nil {
return ifi, nil
}
return nettest.RoutedInterface("ip4", net.FlagUp|net.FlagMulticast)
}
// TransmitProto transmits a protobuf message on the channel given by the message's fully-qualified name.
func (t *Transmitter) TransmitProto(ctx context.Context, m proto.Message) error {
name := m.ProtoReflect().Descriptor().FullName()
if !name.IsValid() {
return fmt.Errorf("unable to derive name of proto message: %v", name)
}
return t.TransmitProtoOnChannel(ctx, string(name), m)
}
// TransmitProto transmits a protobuf message.
func (t *Transmitter) TransmitProtoOnChannel(ctx context.Context, channel string, m proto.Message) error {
// Reuse a buffer to avoid data allocation of a new byte slice every time
t.protoBuf.Reset()
b, err := proto.MarshalOptions{}.MarshalAppend(t.protoBuf.Bytes(), m)
if err != nil {
return fmt.Errorf("transmit proto on channel %s: %w", channel, err)
}
return t.Transmit(ctx, channel, b)
}
// Transmit a raw payload.
//
// If the provided context has a deadline, it will be propagated to the underlying write operation.
func (t *Transmitter) Transmit(ctx context.Context, channel string, data []byte) error {
if compressor := t.opts.compressor[channel]; compressor != nil {
compressed, err := compressor.Compress(data)
if err != nil {
return fmt.Errorf("transmit compress: %w", err)
}
t.msg.Data = compressed
t.msg.Params = "z=" + compressor.Name()
} else {
t.msg.Data = data
t.msg.Params = ""
}
t.msg.Channel = channel
t.msg.SequenceNumber = t.sequenceNumber
t.sequenceNumber++
n, err := t.msg.marshal(t.payloadBuf[:])
if err != nil {
return fmt.Errorf("transmit to LCM: %w", err)
}
for i := range t.messageBuf {
t.messageBuf[i].Buffers[0] = t.payloadBuf[:n]
t.messageBuf[i].N = n
}
deadline, _ := ctx.Deadline()
if err := t.conn.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("transmit to LCM: %w", err)
}
// fast-path: transmit to single address
if len(t.messageBuf) == 1 {
if _, err := t.conn.WriteTo(t.messageBuf[0].Buffers[0], nil, t.messageBuf[0].Addr); err != nil {
return fmt.Errorf("transmit to LCM: %w", err)
}
return nil
}
// transmit to multiple addresses
var transmitCount int
for transmitCount < len(t.messageBuf) {
n, err := t.conn.WriteBatch(t.messageBuf[transmitCount:], 0)
if err != nil {
return fmt.Errorf("transmit to LCM: %w", err)
}
transmitCount += n
}
return nil
}
// Close the transmitter connection.
func (t *Transmitter) Close() error {
if err := t.conn.Close(); err != nil {
return fmt.Errorf("close LCM transmitter: %w", err)
}
return nil
}