Skip to content

Commit

Permalink
track: detect stalled publishers and switch layer
Browse files Browse the repository at this point in the history
Relates to #131.
  • Loading branch information
daniel-abramov committed Apr 3, 2023
1 parent e4bafcc commit 7e401a5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
##
## Build
##
FROM golang:1.19 AS build
FROM golang:1.20 AS build

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/matrix-org/waterfall

go 1.19
go 1.20

require github.com/pion/webrtc/v3 v3.1.31

Expand Down
23 changes: 18 additions & 5 deletions pkg/conference/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pion/rtp"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)

var ErrSubscriptionExists = errors.New("subscription already exists")
Expand Down Expand Up @@ -77,15 +78,17 @@ func NewPublisher(
return publisher, observer.statusCh
}

func (p *Publisher) AddSubscription(subscription Subscription) {
func (p *Publisher) AddSubscription(subscriptions ...Subscription) {
p.mu.Lock()
defer p.mu.Unlock()

if _, ok := p.subscriptions[subscription]; ok {
return
}
for _, s := range subscriptions {
if _, ok := p.subscriptions[s]; ok {
continue
}

p.subscriptions[subscription] = struct{}{}
p.subscriptions[s] = struct{}{}
}
}

func (p *Publisher) RemoveSubscription(subscription Subscription) {
Expand All @@ -94,6 +97,16 @@ func (p *Publisher) RemoveSubscription(subscription Subscription) {
delete(p.subscriptions, subscription)
}

func (p *Publisher) DrainSubscriptions() []Subscription {
p.mu.Lock()
defer p.mu.Unlock()

subscriptions := maps.Keys(p.subscriptions)
maps.Clear(p.subscriptions)

return subscriptions
}

func (p *Publisher) GetTrack() Track {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
61 changes: 47 additions & 14 deletions pkg/conference/track/internal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package track

import (
"fmt"

"github.com/matrix-org/waterfall/pkg/conference/publisher"
"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/matrix-org/waterfall/pkg/worker"
Expand Down Expand Up @@ -49,28 +51,59 @@ func forward(sender *webrtc.TrackRemote, receiver *webrtc.TrackLocalStaticRTP, s
}

func (p *PublishedTrack[SubscriberID]) addVideoPublisher(track *webrtc.TrackRemote) {
// Detect simulcast layer of a publisher and create loggers and scoped telemetry.
simulcast := webrtc_ext.RIDToSimulcastLayer(track.RID())
pub, statusCh := publisher.NewPublisher(
&publisher.RemoteTrack{track},
p.stopPublishers,
p.logger.WithField("layer", simulcast),
)
layerTelemetry := p.telemetry.CreateChild("layer", attribute.String("layer", simulcast.String()))
layerLogger := p.logger.WithField("layer", simulcast.String())

// Create a new publisher for the track.
pub, statusCh := publisher.NewPublisher(&publisher.RemoteTrack{track}, p.stopPublishers, layerLogger)
p.video.publishers[simulcast] = pub

defer p.telemetry.AddEvent("video publisher started", attribute.String("simulcast", simulcast.String()))

// Listen on `done` and remove the track once it's done.
// Observe the status of the publisher.
p.activePublishers.Add(1)
go func() {
// Once this go-routine is done, inform that this publisher is stopped.
defer p.activePublishers.Done()
defer p.telemetry.AddEvent("video publisher stopped", attribute.String("simulcast", simulcast.String()))
defer layerTelemetry.End()

// Observe publisher's status events.
for status := range statusCh {
switch status {
// Publisher is not active (no packets received for a while).
case publisher.StatusStalled:
p.mutex.Lock()
defer p.mutex.Unlock()

// Let's check if we're muted. If we are, it's ok to not receive packets.
if p.metadata.Muted {
layerLogger.Info("Publisher is stalled but we're muted, ignoring")
layerTelemetry.AddEvent("muted")
continue
}

// Otherwise, remove all subscriptions and switch them to the lowest layer if available.
// We assume that the lowest layer is the latest to fail (normally, lowest layer always
// receive packets even if other layers are stalled).
subscriptions := pub.DrainSubscriptions()
lowLayer := p.video.publishers[webrtc_ext.SimulcastLayerLow]
if lowLayer != nil {
layerLogger.Info("Publisher is stalled, switching to the lowest layer")
layerTelemetry.AddEvent("stalled, switched to the low layer")
lowLayer.AddSubscription(subscriptions...)
continue
}

// Otherwise, we have no other layer to switch to. Bummer.
layerLogger.Warn("Publisher is stalled and we have no other layer to switch to")
layerTelemetry.Fail(fmt.Errorf("stalled"))
continue

// Wait for the channel to be closed. Ignore statuses for now.
for {
_, closed := <-statusCh
if closed {
break
// Publisher is active again (new packets received).
case publisher.StatusRecovered:
// Currently, we don't have any actions when the publisher is recovered, i.e. we
// do not switch subscriptions that **used to be subscribed to this layer** back.
// But we may want to do it once we have congestion control and bandwidth allocation.
}
}

Expand Down

0 comments on commit 7e401a5

Please sign in to comment.