Skip to content
This repository has been archived by the owner on Dec 2, 2023. It is now read-only.

Commit

Permalink
fix(biz): race condition for room & peer (#567)
Browse files Browse the repository at this point in the history
* fix(biz): race condition for room & peer

1. When the same client rejoins with the same uid, delPeer in defer of exiting goroutine may delete the newly created peer with the same uid.
2. Fix RWMutex use in BizServer.createRoom()/.getRoom()
3. Room.getPeers() return was not thread-safe to iterate

* chore: go mod tidy

* fix(biz): race conditon in delRoom/createRoom

* fix(biz): reduce goroutine churn for peer.send()

* fix(biz): pass github actions

* lint(ion): disable grpc.CustomCodec deprecated

Can not migrate nrpc.Codec to encoding.RegisterCodec() yet
  • Loading branch information
hn8 authored Jul 2, 2021
1 parent 83ab6c7 commit 05380cc
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 318 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ jobs:
- uses: actions/checkout@v2

- name: build
run: make build
run: make all
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: biz docker
name: app-biz docker
on:
push:
branches:
Expand All @@ -17,15 +17,15 @@ jobs:
- uses: actions/checkout@v2

- name: build
run: docker build --tag pionwebrtc/ion:latest-biz -f docker/biz.Dockerfile .
run: docker build --tag pionwebrtc/ion:latest-app-biz -f docker/app-biz.Dockerfile .

- name: login
if: github.event_name == 'release'
run: echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin

- name: tag
if: github.event_name == 'release'
run: docker tag pionwebrtc/ion:latest-biz pionwebrtc/ion:"$TAG"-biz
run: docker tag pionwebrtc/ion:latest-app-biz pionwebrtc/ion:"$TAG"-app-biz
env:
TAG: ${{ github.event.release.tag_name }}

Expand Down
2 changes: 1 addition & 1 deletion apps/biz/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ package main
// ============================================================================
// GRPC & Protobuf
//go:generate /usr/bin/env bash -c "echo 'Generating [biz] protobuf and grpc services for Go, outdir=$OUTDIR'"
//go:generate protoc ./proto/biz.proto -I./proto -I../../ --go_opt=module=github.com/pion/ion --go_out=../../$OUTDIR --go-grpc_opt=module=github.com/pion/ion --go-grpc_out=../../$OUTDIR
//go:generate /usr/bin/env bash -c "if command -v protoc &>/dev/null; then protoc ./proto/biz.proto -I./proto -I../../ --go_opt=module=github.com/pion/ion --go_out=../../$OUTDIR --go-grpc_opt=module=github.com/pion/ion --go-grpc_out=../../$OUTDIR; fi"
15 changes: 8 additions & 7 deletions apps/biz/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ func NewPeer(sid string, uid string, info []byte, senCh chan *biz.SignalReply) *
info: info,
sndCh: senCh,
}
p.closed.Set(false)
return p
}

// Close peer
func (p *Peer) Close() {
if p.closed.Get() {
if !p.closed.Set(true) {
return
}
p.closed.Set(true)
}

// UID return peer uid
Expand All @@ -46,9 +44,13 @@ func (p *Peer) SID() string {
}

func (p *Peer) send(data *biz.SignalReply) error {
go func() {
p.sndCh <- data
}()
select {
case p.sndCh <- data:
default:
go func() {
p.sndCh <- data
}()
}
return nil
}

Expand All @@ -58,7 +60,6 @@ func (p *Peer) sendPeerEvent(event *ion.PeerEvent) error {
PeerEvent: event,
},
}

return p.send(data)
}

Expand Down
39 changes: 25 additions & 14 deletions apps/biz/server/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,39 @@ func (r *Room) getPeer(uid string) *Peer {
}

// getPeers get peers in the room
func (r *Room) getPeers() map[string]*Peer {
func (r *Room) getPeers() []*Peer {
r.RLock()
defer r.RUnlock()
return r.peers
p := make([]*Peer, 0, len(r.peers))
for _, peer := range r.peers {
p = append(p, peer)
}
return p
}

// delPeer delete a peer in the room
func (r *Room) delPeer(uid string) int {
func (r *Room) delPeer(p *Peer) int {
uid := p.uid
r.Lock()
delete(r.peers, uid)
found := r.peers[uid] == p
if found {
delete(r.peers, uid)
}
peerCount := len(r.peers)
r.Unlock()

event := &ion.PeerEvent{
State: ion.PeerEvent_LEAVE,
Peer: &ion.Peer{
Sid: r.sid,
Uid: uid,
},
if found {
event := &ion.PeerEvent{
State: ion.PeerEvent_LEAVE,
Peer: &ion.Peer{
Sid: r.sid,
Uid: uid,
},
}
r.sendPeerEvent(event)
}
r.sendPeerEvent(event)

return len(r.peers)
return peerCount
}

// count return count of peers in room
Expand Down Expand Up @@ -135,8 +146,8 @@ func (r *Room) sendMessage(msg *ion.Message) {
data := msg.Data
log.Debugf("Room.onMessage %v => %v, data: %v", from, to, data)
peers := r.getPeers()
for id, p := range peers {
if id == to || to == "all" || to == r.sid {
for _, p := range peers {
if to == p.uid || to == "all" || to == r.sid {
if err := p.sendMessage(msg); err != nil {
log.Errorf("send msg to peer(%s) error: %v", p.uid, err)
}
Expand Down
53 changes: 28 additions & 25 deletions apps/biz/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type BizServer struct {
nc *nats.Conn
roomLock sync.RWMutex
rooms map[string]*Room
closed chan bool
closed chan struct{}
ndc *ndc.Client
islbcli islb.ISLBClient
bn *BIZ
Expand All @@ -45,7 +45,7 @@ func newBizServer(bn *BIZ, c string, nid string, nc *nats.Conn) (*BizServer, err
bn: bn,
nc: nc,
rooms: make(map[string]*Room),
closed: make(chan bool),
closed: make(chan struct{}),
stream: nil,
}

Expand All @@ -57,24 +57,29 @@ func (s *BizServer) close() {
}

func (s *BizServer) createRoom(sid string, sfuNID string) *Room {
s.roomLock.RLock()
defer s.roomLock.RUnlock()
s.roomLock.Lock()
defer s.roomLock.Unlock()
if r := s.rooms[sid]; r != nil {
return r
}
r := newRoom(sid, sfuNID)
s.rooms[sid] = r
return r
}

func (s *BizServer) getRoom(id string) *Room {
s.roomLock.Lock()
defer s.roomLock.Unlock()
r := s.rooms[id]
return r
s.roomLock.RLock()
defer s.roomLock.RUnlock()
return s.rooms[id]
}

func (s *BizServer) delRoom(id string) {
func (s *BizServer) delRoom(r *Room) {
id := r.SID()
s.roomLock.Lock()
defer s.roomLock.Unlock()
delete(s.rooms, id)
if s.rooms[id] == r {
delete(s.rooms, id)
}
}

func (s *BizServer) watchISLBEvent(nid string, sid string) error {
Expand Down Expand Up @@ -141,18 +146,18 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
var r *Room = nil
var peer *Peer = nil
errCh := make(chan error)
repCh := make(chan *biz.SignalReply)
repCh := make(chan *biz.SignalReply, 1)
reqCh := make(chan *biz.SignalRequest)

defer func() {
if peer != nil && r != nil {
peer.Close()
r.delPeer(peer.UID())
}

if r != nil && r.count() == 0 {
s.delRoom(r.SID())
r = nil
if r != nil {
if peer != nil {
peer.Close()
r.delPeer(peer)
}
if r.count() == 0 {
s.delRoom(r)
}
}

log.Infof("BizServer.Signal loop done")
Expand Down Expand Up @@ -245,14 +250,12 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
case *biz.SignalRequest_Leave:
uid := payload.Leave.Uid
if peer != nil && peer.uid == uid {
r.delPeer(uid)
peer.Close()
peer = nil

if r.count() == 0 {
s.delRoom(r.SID())
if r.delPeer(peer) == 0 {
s.delRoom(r)
r = nil
}
peer.Close()
peer = nil

err := stream.Send(&biz.SignalReply{
Payload: &biz.SignalReply_LeaveReply{
Expand Down
2 changes: 1 addition & 1 deletion cmd/signal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func main() {
defer sig.Close()

srv := grpc.NewServer(
grpc.CustomCodec(nrpc.Codec()),
grpc.CustomCodec(nrpc.Codec()), // nolint:staticcheck
grpc.UnknownServiceHandler(nproxy.TransparentHandler(sig.Director)))

s := server.NewWrapperedGRPCWebServer(options, srv)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/go-redis/redis/v7 v7.4.0
github.com/improbable-eng/grpc-web v0.13.0
github.com/jhump/protoreflect v1.8.2
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
github.com/pion/ion-avp v1.8.4
github.com/pion/ion-log v1.2.0
Expand Down
Loading

0 comments on commit 05380cc

Please sign in to comment.