diff --git a/Dockerfile b/Dockerfile index 7e15ad6..521361b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ ## ## Build ## -FROM golang:1.19 AS build +FROM golang:1.20 AS build WORKDIR /app diff --git a/go.mod b/go.mod index 97da679..00fe616 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/matrix-org/waterfall -go 1.19 +go 1.20 require github.com/pion/webrtc/v3 v3.1.31 diff --git a/pkg/conference/publisher/publisher.go b/pkg/conference/publisher/publisher.go index 7e7789e..e30e0cf 100644 --- a/pkg/conference/publisher/publisher.go +++ b/pkg/conference/publisher/publisher.go @@ -8,6 +8,7 @@ import ( "github.com/pion/rtp" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" ) var ErrSubscriptionExists = errors.New("subscription already exists") @@ -77,15 +78,17 @@ func NewPublisher( return publisher, observer.statusCh } -func (p *Publisher) AddSubscription(subscription Subscription) { +func (p *Publisher) AddSubscription(subscriptions ...Subscription) { p.mu.Lock() defer p.mu.Unlock() - if _, ok := p.subscriptions[subscription]; ok { - return - } + for _, s := range subscriptions { + if _, ok := p.subscriptions[s]; ok { + continue + } - p.subscriptions[subscription] = struct{}{} + p.subscriptions[s] = struct{}{} + } } func (p *Publisher) RemoveSubscription(subscription Subscription) { @@ -94,6 +97,16 @@ func (p *Publisher) RemoveSubscription(subscription Subscription) { delete(p.subscriptions, subscription) } +func (p *Publisher) DrainSubscriptions() []Subscription { + p.mu.Lock() + defer p.mu.Unlock() + + subscriptions := maps.Keys(p.subscriptions) + maps.Clear(p.subscriptions) + + return subscriptions +} + func (p *Publisher) GetTrack() Track { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/conference/track/internal.go b/pkg/conference/track/internal.go index 2912028..7de78b1 100644 --- a/pkg/conference/track/internal.go +++ b/pkg/conference/track/internal.go @@ -1,6 +1,8 @@ package track import ( + "fmt" + "github.com/matrix-org/waterfall/pkg/conference/publisher" "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/matrix-org/waterfall/pkg/worker" @@ -49,28 +51,59 @@ func forward(sender *webrtc.TrackRemote, receiver *webrtc.TrackLocalStaticRTP, s } func (p *PublishedTrack[SubscriberID]) addVideoPublisher(track *webrtc.TrackRemote) { + // Detect simulcast layer of a publisher and create loggers and scoped telemetry. simulcast := webrtc_ext.RIDToSimulcastLayer(track.RID()) - pub, statusCh := publisher.NewPublisher( - &publisher.RemoteTrack{track}, - p.stopPublishers, - p.logger.WithField("layer", simulcast), - ) + layerTelemetry := p.telemetry.CreateChild("layer", attribute.String("layer", simulcast.String())) + layerLogger := p.logger.WithField("layer", simulcast.String()) + // Create a new publisher for the track. + pub, statusCh := publisher.NewPublisher(&publisher.RemoteTrack{track}, p.stopPublishers, layerLogger) p.video.publishers[simulcast] = pub - defer p.telemetry.AddEvent("video publisher started", attribute.String("simulcast", simulcast.String())) - - // Listen on `done` and remove the track once it's done. + // Observe the status of the publisher. p.activePublishers.Add(1) go func() { + // Once this go-routine is done, inform that this publisher is stopped. defer p.activePublishers.Done() - defer p.telemetry.AddEvent("video publisher stopped", attribute.String("simulcast", simulcast.String())) + defer layerTelemetry.End() + + // Observe publisher's status events. + for status := range statusCh { + switch status { + // Publisher is not active (no packets received for a while). + case publisher.StatusStalled: + p.mutex.Lock() + defer p.mutex.Unlock() + + // Let's check if we're muted. If we are, it's ok to not receive packets. + if p.metadata.Muted { + layerLogger.Info("Publisher is stalled but we're muted, ignoring") + layerTelemetry.AddEvent("muted") + continue + } + + // Otherwise, remove all subscriptions and switch them to the lowest layer if available. + // We assume that the lowest layer is the latest to fail (normally, lowest layer always + // receive packets even if other layers are stalled). + subscriptions := pub.DrainSubscriptions() + lowLayer := p.video.publishers[webrtc_ext.SimulcastLayerLow] + if lowLayer != nil { + layerLogger.Info("Publisher is stalled, switching to the lowest layer") + layerTelemetry.AddEvent("stalled, switched to the low layer") + lowLayer.AddSubscription(subscriptions...) + continue + } + + // Otherwise, we have no other layer to switch to. Bummer. + layerLogger.Warn("Publisher is stalled and we have no other layer to switch to") + layerTelemetry.Fail(fmt.Errorf("stalled")) + continue - // Wait for the channel to be closed. Ignore statuses for now. - for { - _, closed := <-statusCh - if closed { - break + // Publisher is active again (new packets received). + case publisher.StatusRecovered: + // Currently, we don't have any actions when the publisher is recovered, i.e. we + // do not switch subscriptions that **used to be subscribed to this layer** back. + // But we may want to do it once we have congestion control and bandwidth allocation. } }