Skip to content

Commit

Permalink
track: handle stalled published tracks
Browse files Browse the repository at this point in the history
So that when the layer on a published track stalled, we can change
subscriptions to expect receiving packets from a different track
layer (different simulcast layer) within the same published track.

Solves #131.
  • Loading branch information
daniel-abramov committed Apr 3, 2023
1 parent 7e401a5 commit 0ed2a1c
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 377 deletions.
16 changes: 4 additions & 12 deletions pkg/conference/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/pion/rtp"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)

var ErrSubscriptionExists = errors.New("subscription already exists")
Expand Down Expand Up @@ -91,20 +90,13 @@ func (p *Publisher) AddSubscription(subscriptions ...Subscription) {
}
}

func (p *Publisher) RemoveSubscription(subscription Subscription) {
func (p *Publisher) RemoveSubscription(subscription ...Subscription) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.subscriptions, subscription)
}

func (p *Publisher) DrainSubscriptions() []Subscription {
p.mu.Lock()
defer p.mu.Unlock()

subscriptions := maps.Keys(p.subscriptions)
maps.Clear(p.subscriptions)

return subscriptions
for _, s := range subscription {
delete(p.subscriptions, s)
}
}

func (p *Publisher) GetTrack() Track {
Expand Down
12 changes: 0 additions & 12 deletions pkg/conference/subscription/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"

"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)
Expand Down Expand Up @@ -42,17 +41,6 @@ func (s *AudioSubscription) WriteRTP(packet rtp.Packet) error {
return fmt.Errorf("Bug: no write RTP logic for an audio subscription!")
}

func (s *AudioSubscription) SwitchLayer(simulcast webrtc_ext.SimulcastLayer) {
}

func (s *AudioSubscription) Simulcast() webrtc_ext.SimulcastLayer {
return webrtc_ext.SimulcastLayerNone
}

func (s *AudioSubscription) UpdateMuteState(muted bool) {
// We don't have any business logic at the moment for audio subscriptions.
}

func (s *AudioSubscription) readRTCP() {
// Read incoming RTCP packets. Before these packets are returned they are processed by interceptors.
// For things like NACK this needs to be called.
Expand Down
4 changes: 0 additions & 4 deletions pkg/conference/subscription/subscription.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package subscription

import (
"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)

type Subscription interface {
Unsubscribe() error
WriteRTP(packet rtp.Packet) error
SwitchLayer(simulcast webrtc_ext.SimulcastLayer)
Simulcast() webrtc_ext.SimulcastLayer
UpdateMuteState(muted bool)
}

type SubscriptionController interface {
Expand Down
149 changes: 49 additions & 100 deletions pkg/conference/subscription/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,50 @@ import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
)

type RequestKeyFrameFn = func(simulcast webrtc_ext.SimulcastLayer) error

type VideoSubscription struct {
rtpSender *webrtc.RTPSender

info webrtc_ext.TrackInfo

currentLayer atomic.Int32 // atomic webrtc_ext.SimulcastLayer
muted atomic.Bool // we don't expect any RTP packets
stalled atomic.Bool // we do expect RTP packets, but haven't received for a while

controller SubscriptionController
requestKeyFrameFn RequestKeyFrameFn
worker *worker.Worker[rtp.Packet]
controller SubscriptionController
worker *worker.Worker[rtp.Packet]
stopped atomic.Bool

logger *logrus.Entry
telemetry *telemetry.Telemetry
}

type KeyFrameRequest struct{}

// Creates a new video subscription. Returns a subscription along with a channel
// that informs the parent about key frame requests from the subscriptions. When the
// channel is closed, the subscription's go-routine is stopped.
func NewVideoSubscription(
info webrtc_ext.TrackInfo,
simulcast webrtc_ext.SimulcastLayer,
muted bool,
controller SubscriptionController,
requestKeyFrameFn RequestKeyFrameFn,
logger *logrus.Entry,
telemetryBuilder *telemetry.ChildBuilder,
) (*VideoSubscription, error) {
) (*VideoSubscription, <-chan KeyFrameRequest, error) {
// Create a new track.
rtpTrack, err := webrtc.NewTrackLocalStaticRTP(info.Codec, info.TrackID, info.StreamID)
if err != nil {
return nil, fmt.Errorf("Failed to create track: %s", err)
return nil, nil, fmt.Errorf("Failed to create track: %v", err)
}

rtpSender, err := controller.AddTrack(rtpTrack)
if err != nil {
return nil, fmt.Errorf("Failed to add track: %s", err)
return nil, nil, fmt.Errorf("Failed to add track: %v", err)
}

// Atomic version of the webrtc_ext.SimulcastLayer.
var currentLayer atomic.Int32
currentLayer.Store(int32(simulcast))

// By default we assume that the track is not muted.
var mutedState atomic.Bool
mutedState.Store(muted)

// Also, the track is not stalled by default.
var stalled atomic.Bool

// Create a subscription.
subscription := &VideoSubscription{
rtpSender,
info,
currentLayer,
mutedState,
stalled,
controller,
requestKeyFrameFn,
nil,
atomic.Bool{},
logger,
telemetryBuilder.Create("VideoSubscription"),
}
Expand All @@ -90,100 +71,68 @@ func NewVideoSubscription(

// Configure the worker for the subscription.
workerConfig := worker.Config[rtp.Packet]{
ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes.
Timeout: 3 * time.Second, // When do we assume the subscription is stalled.
OnTimeout: func() {
// Not receiving RTP packets for 3 seconds can happen either if we're muted (not an error),
// or if the peer does not send any data (that's a problem that potentially means a freeze).
// Also, we don't want to execute this part if the subscription has already been marked as stalled.
if !subscription.muted.Load() && !subscription.stalled.Load() {
layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load())
logger.Warnf("No RTP on subscription to %s (%s) for 3 seconds", subscription.info.TrackID, layer)
subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 3 seconds on %s", layer))
subscription.stalled.Store(true)
}
},
OnTask: workerState.handlePacket,
ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes.
Timeout: 1 * time.Hour,
OnTimeout: func() {},
OnTask: workerState.handlePacket,
}

// Start a worker for the subscription and create a subsription.
subscription.worker = worker.StartWorker(workerConfig)

// Start reading and forwarding RTCP packets.
go subscription.readRTCP()

// Request a key frame, so that we can get it from the publisher right after subscription.
subscription.requestKeyFrame()

subscription.telemetry.AddEvent("subscribed", attribute.String("layer", simulcast.String()))
// Start reading and forwarding RTCP packets goroutine.
ch := subscription.startReadRTCP()

return subscription, nil
return subscription, ch, nil
}

func (s *VideoSubscription) Unsubscribe() error {
if !s.stopped.CompareAndSwap(false, true) {
return fmt.Errorf("Already stopped")
}

s.worker.Stop()
s.logger.Infof("Unsubscribing from %s (%s)", s.info.TrackID, webrtc_ext.SimulcastLayer(s.currentLayer.Load()))
s.logger.Info("Unsubscribed")
s.telemetry.End()
return s.controller.RemoveTrack(s.rtpSender)
}

func (s *VideoSubscription) WriteRTP(packet rtp.Packet) error {
if s.stalled.CompareAndSwap(true, false) {
simulcast := webrtc_ext.SimulcastLayer(s.currentLayer.Load())
s.logger.Infof("Recovered subscription to %s (%s)", s.info.TrackID, simulcast)
s.telemetry.AddEvent("subscription recovered")
}

// Send the packet to the worker.
return s.worker.Send(packet)
}

func (s *VideoSubscription) SwitchLayer(simulcast webrtc_ext.SimulcastLayer) {
s.logger.Infof("Switching layer on %s to %s", s.info.TrackID, simulcast)
s.telemetry.AddEvent("switching simulcast layer", attribute.String("layer", simulcast.String()))
s.currentLayer.Store(int32(simulcast))
s.requestKeyFrameFn(simulcast)
}

func (s *VideoSubscription) TrackInfo() webrtc_ext.TrackInfo {
return s.info
}

func (s *VideoSubscription) Simulcast() webrtc_ext.SimulcastLayer {
return webrtc_ext.SimulcastLayer(s.currentLayer.Load())
}

func (s *VideoSubscription) UpdateMuteState(muted bool) {
s.muted.Store(muted)
}

// Read incoming RTCP packets. Before these packets are returned they are processed by interceptors.
func (s *VideoSubscription) readRTCP() {
for {
packets, _, err := s.rtpSender.ReadRTCP()
if err != nil {
if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) {
layer := webrtc_ext.SimulcastLayer(s.currentLayer.Load())
s.logger.Debugf("failed to read RTCP on track: %s (%s): %s", s.info.TrackID, layer, err)
s.telemetry.AddEvent("subscription stopped")
s.worker.Stop()
return
func (s *VideoSubscription) startReadRTCP() <-chan KeyFrameRequest {
ch := make(chan KeyFrameRequest)

go func() {
defer close(ch)
defer s.Unsubscribe()
defer s.telemetry.AddEvent("Stopped")
defer s.logger.Info("Stopped")

for {
packets, _, err := s.rtpSender.ReadRTCP()
if err != nil {
if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) {
s.logger.Debugf("Failed to read RTCP: %v", err)
return
}
}
}

// We only want to inform others about PLIs and FIRs. We skip the rest of the packets for now.
for _, packet := range packets {
switch packet.(type) {
// For simplicity we assume that any of the key frame requests is just a key frame request.
case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest:
s.requestKeyFrame()
// We only want to inform others about PLIs and FIRs. We skip the rest of the packets for now.
for _, packet := range packets {
switch packet.(type) {
// For simplicity we assume that any of the key frame requests is just a key frame request.
case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest:
ch <- KeyFrameRequest{}
}
}
}
}
}
}()

func (s *VideoSubscription) requestKeyFrame() {
s.requestKeyFrameFn(webrtc_ext.SimulcastLayer(s.currentLayer.Load()))
return ch
}

// Internal state of a worker that runs in its own goroutine.
Expand Down
Loading

0 comments on commit 0ed2a1c

Please sign in to comment.