Skip to content

Commit 244f699

Browse files
authored
bebop: fix concurrent map writes (hybridgroup#1063)
1 parent cb1f952 commit 244f699

File tree

2 files changed

+100
-81
lines changed

2 files changed

+100
-81
lines changed

platforms/parrot/bebop/client/client.go

+37-81
Original file line numberDiff line numberDiff line change
@@ -107,50 +107,6 @@ func NewNetworkFrame(buf []byte) NetworkFrame {
107107
return frame
108108
}
109109

110-
func networkFrameGenerator() func(*bytes.Buffer, byte, byte) *bytes.Buffer {
111-
// func networkFrameGenerator() func(*bytes.Buffer, byte, byte) NetworkFrame {
112-
//
113-
// ARNETWORKAL_Frame_t
114-
//
115-
// uint8 type - frame type ARNETWORK_FRAME_TYPE
116-
// uint8 id - identifier of the buffer sending the frame
117-
// uint8 seq - sequence number of the frame
118-
// uint32 size - size of the frame
119-
//
120-
121-
// each frame id has it's own sequence number
122-
seq := make(map[byte]byte)
123-
124-
hlen := 7 // size of ARNETWORKAL_Frame_t header
125-
126-
return func(cmd *bytes.Buffer, frameType byte, id byte) *bytes.Buffer {
127-
if _, ok := seq[id]; !ok {
128-
seq[id] = 0
129-
}
130-
131-
seq[id]++
132-
133-
if seq[id] > 255 {
134-
seq[id] = 0
135-
}
136-
137-
ret := &bytes.Buffer{}
138-
ret.WriteByte(frameType)
139-
ret.WriteByte(id)
140-
ret.WriteByte(seq[id])
141-
142-
size := &bytes.Buffer{}
143-
if err := binary.Write(size, binary.LittleEndian, uint32(cmd.Len()+hlen)); err != nil {
144-
panic(err)
145-
}
146-
147-
ret.Write(size.Bytes())
148-
ret.Write(cmd.Bytes())
149-
150-
return ret
151-
}
152-
}
153-
154110
type Pcmd struct {
155111
Flag int
156112
Roll int
@@ -161,33 +117,33 @@ type Pcmd struct {
161117
}
162118

163119
type Bebop struct {
164-
IP string
165-
NavData map[string]string
166-
Pcmd Pcmd
167-
tmpFrame tmpFrame
168-
C2dPort int
169-
D2cPort int
170-
RTPStreamPort int
171-
RTPControlPort int
172-
DiscoveryPort int
173-
c2dClient *net.UDPConn
174-
d2cClient *net.UDPConn
175-
discoveryClient *net.TCPConn
176-
networkFrameGenerator func(*bytes.Buffer, byte, byte) *bytes.Buffer
177-
video chan []byte
178-
writeChan chan []byte
120+
IP string
121+
NavData map[string]string
122+
Pcmd Pcmd
123+
tmpFrame tmpFrame
124+
C2dPort int
125+
D2cPort int
126+
RTPStreamPort int
127+
RTPControlPort int
128+
DiscoveryPort int
129+
c2dClient *net.UDPConn
130+
d2cClient *net.UDPConn
131+
discoveryClient *net.TCPConn
132+
nwFrameGenerator *nwFrameGenerator
133+
video chan []byte
134+
writeChan chan []byte
179135
}
180136

181137
func New() *Bebop {
182138
return &Bebop{
183-
IP: "192.168.42.1",
184-
NavData: make(map[string]string),
185-
C2dPort: 54321,
186-
D2cPort: 43210,
187-
RTPStreamPort: 55004,
188-
RTPControlPort: 55005,
189-
DiscoveryPort: 44444,
190-
networkFrameGenerator: networkFrameGenerator(),
139+
IP: "192.168.42.1",
140+
NavData: make(map[string]string),
141+
C2dPort: 54321,
142+
D2cPort: 43210,
143+
RTPStreamPort: 55004,
144+
RTPControlPort: 55005,
145+
DiscoveryPort: 44444,
146+
nwFrameGenerator: newNetworkFrameGenerator(),
191147
Pcmd: Pcmd{
192148
Flag: 0,
193149
Roll: 0,
@@ -332,7 +288,7 @@ func (b *Bebop) FlatTrim() error {
332288

333289
cmd.Write(tmp.Bytes())
334290

335-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
291+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
336292
}
337293

338294
func (b *Bebop) GenerateAllStates() error {
@@ -352,7 +308,7 @@ func (b *Bebop) GenerateAllStates() error {
352308

353309
cmd.Write(tmp.Bytes())
354310

355-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
311+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
356312
}
357313

358314
func (b *Bebop) TakeOff() error {
@@ -372,7 +328,7 @@ func (b *Bebop) TakeOff() error {
372328

373329
cmd.Write(tmp.Bytes())
374330

375-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
331+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
376332
}
377333

378334
func (b *Bebop) Land() error {
@@ -392,7 +348,7 @@ func (b *Bebop) Land() error {
392348

393349
cmd.Write(tmp.Bytes())
394350

395-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
351+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
396352
}
397353

398354
func (b *Bebop) Up(val int) error {
@@ -516,7 +472,7 @@ func (b *Bebop) generatePcmd() *bytes.Buffer {
516472
}
517473
cmd.Write(tmp.Bytes())
518474

519-
return b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID)
475+
return b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID)
520476
}
521477

522478
func (b *Bebop) createAck(frame NetworkFrame) *bytes.Buffer {
@@ -528,14 +484,14 @@ func (b *Bebop) createAck(frame NetworkFrame) *bytes.Buffer {
528484
// libARNetwork/Sources/ARNETWORK_Manager.h#ARNETWORK_Manager_IDOutputToIDAck
529485
//
530486

531-
return b.networkFrameGenerator(bytes.NewBuffer([]byte{uint8(frame.Seq)}),
487+
return b.nwFrameGenerator.generate(bytes.NewBuffer([]byte{uint8(frame.Seq)}),
532488
ARNETWORKAL_FRAME_TYPE_ACK,
533489
byte(uint16(frame.Id)+(ARNETWORKAL_MANAGER_DEFAULT_ID_MAX/2)),
534490
)
535491
}
536492

537493
func (b *Bebop) createPong(frame NetworkFrame) *bytes.Buffer {
538-
return b.networkFrameGenerator(bytes.NewBuffer(frame.Data),
494+
return b.nwFrameGenerator.generate(bytes.NewBuffer(frame.Data),
539495
ARNETWORKAL_FRAME_TYPE_DATA,
540496
ARNETWORK_MANAGER_INTERNAL_BUFFER_ID_PONG,
541497
)
@@ -579,13 +535,13 @@ func (b *Bebop) packetReceiver(buf []byte) {
579535
func (b *Bebop) StartRecording() error {
580536
buf := b.videoRecord(ARCOMMANDS_ARDRONE3_MEDIARECORD_VIDEO_RECORD_START)
581537

582-
return b.write(b.networkFrameGenerator(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
538+
return b.write(b.nwFrameGenerator.generate(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
583539
}
584540

585541
func (b *Bebop) StopRecording() error {
586542
buf := b.videoRecord(ARCOMMANDS_ARDRONE3_MEDIARECORD_VIDEO_RECORD_STOP)
587543

588-
return b.write(b.networkFrameGenerator(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
544+
return b.write(b.nwFrameGenerator.generate(buf, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
589545
}
590546

591547
func (b *Bebop) videoRecord(state byte) *bytes.Buffer {
@@ -650,7 +606,7 @@ func (b *Bebop) HullProtection(protect bool) error {
650606
}
651607
cmd.Write(tmp.Bytes())
652608

653-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
609+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
654610
}
655611

656612
func (b *Bebop) Outdoor(outdoor bool) error {
@@ -679,7 +635,7 @@ func (b *Bebop) Outdoor(outdoor bool) error {
679635
}
680636
cmd.Write(tmp.Bytes())
681637

682-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
638+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
683639
}
684640

685641
func (b *Bebop) VideoEnable(enable bool) error {
@@ -704,7 +660,7 @@ func (b *Bebop) VideoEnable(enable bool) error {
704660
}
705661
cmd.Write(tmp.Bytes())
706662

707-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
663+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
708664
}
709665

710666
func (b *Bebop) VideoStreamMode(mode int8) error {
@@ -729,7 +685,7 @@ func (b *Bebop) VideoStreamMode(mode int8) error {
729685
}
730686
cmd.Write(tmp.Bytes())
731687

732-
return b.write(b.networkFrameGenerator(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
688+
return b.write(b.nwFrameGenerator.generate(cmd, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_NONACK_ID).Bytes())
733689
}
734690

735691
func bool2int8(b bool) int8 {
@@ -826,5 +782,5 @@ func (b *Bebop) createARStreamACK(frame ARStreamFrame) *bytes.Buffer {
826782
}
827783
ackPacket.Write(tmp.Bytes())
828784

829-
return b.networkFrameGenerator(ackPacket, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_VIDEO_ACK_ID)
785+
return b.nwFrameGenerator.generate(ackPacket, ARNETWORKAL_FRAME_TYPE_DATA, BD_NET_CD_VIDEO_ACK_ID)
830786
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package client
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"sync"
7+
)
8+
9+
type nwFrameGenerator struct {
10+
seq map[byte]byte
11+
hlen int
12+
mutex *sync.Mutex
13+
}
14+
15+
func newNetworkFrameGenerator() *nwFrameGenerator {
16+
nwg := nwFrameGenerator{
17+
seq: make(map[byte]byte), // each frame id has it's own sequence number
18+
hlen: 7, // size of ARNETWORKAL_Frame_t header
19+
mutex: &sync.Mutex{},
20+
}
21+
return &nwg
22+
}
23+
24+
// generate the "NetworkFrame" as bytes buffer
25+
func (nwg *nwFrameGenerator) generate(cmd *bytes.Buffer, frameType byte, id byte) *bytes.Buffer {
26+
nwg.mutex.Lock()
27+
defer nwg.mutex.Unlock()
28+
29+
// func networkFrameGenerator() func(*bytes.Buffer, byte, byte) NetworkFrame {
30+
//
31+
// ARNETWORKAL_Frame_t
32+
//
33+
// uint8 type - frame type ARNETWORK_FRAME_TYPE
34+
// uint8 id - identifier of the buffer sending the frame
35+
// uint8 seq - sequence number of the frame
36+
// uint32 size - size of the frame
37+
//
38+
39+
if _, ok := nwg.seq[id]; !ok {
40+
nwg.seq[id] = 0
41+
}
42+
43+
nwg.seq[id]++
44+
45+
if nwg.seq[id] > 255 {
46+
nwg.seq[id] = 0
47+
}
48+
49+
ret := &bytes.Buffer{}
50+
ret.WriteByte(frameType)
51+
ret.WriteByte(id)
52+
ret.WriteByte(nwg.seq[id])
53+
54+
size := &bytes.Buffer{}
55+
if err := binary.Write(size, binary.LittleEndian, uint32(cmd.Len()+nwg.hlen)); err != nil {
56+
panic(err)
57+
}
58+
59+
ret.Write(size.Bytes())
60+
ret.Write(cmd.Bytes())
61+
62+
return ret
63+
}

0 commit comments

Comments
 (0)