From a43948931329bdea510a6df7a4e43dd3bd90d130 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Tue, 31 Jan 2023 22:15:39 +0100 Subject: [PATCH 01/17] peer: remove unnecessary accidental complexity There was no point requesting the key frame by sending a message to the conference as the only thing that the conference did was calling a single function from the peer, so it was completely wasting the CPU time. Instead, we could directly call this function and write to the peer connection. The function is thread-safe since all mutable state is protected by a mutex and the peer connection has a mutex inside. --- pkg/conference/participant/tracker.go | 44 +++++++++-------------- pkg/conference/peer_message_processing.go | 6 ---- pkg/conference/processing.go | 2 -- pkg/peer/messages.go | 5 --- pkg/peer/peer.go | 8 ++--- 5 files changed, 19 insertions(+), 46 deletions(-) diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 40012de..619e7f6 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -1,8 +1,6 @@ package participant import ( - "fmt" - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/peer/subscription" "github.com/pion/rtp" @@ -181,26 +179,34 @@ func (t *Tracker) Subscribe(participantID ID, requests []SubscribeRequest) { err error ) + published := t.FindPublishedTrack(request.TrackID) + if published == nil { + participant.Logger.Errorf("Can't subscribe to non-existent track %s", request.TrackID) + continue + } + switch request.Kind { case webrtc.RTPCodecTypeVideo: + owner := t.GetParticipant(published.Owner) + if owner == nil { + participant.Logger.Errorf("Can't subscribe to non-existent owner %s", published.Owner) + continue + } + sub, err = subscription.NewVideoSubscription( request.TrackInfo, request.Simulcast, participant.Peer, func(track common.TrackInfo, simulcast common.SimulcastLayer) error { - return participant.Peer.RequestKeyFrame(track, simulcast) + return owner.Peer.RequestKeyFrame(track, simulcast) }, participant.Logger, ) case webrtc.RTPCodecTypeAudio: - if published := t.FindPublishedTrack(request.TrackID); published != nil { - sub, err = subscription.NewAudioSubscription( - published.OutputTrack, - participant.Peer, - ) - } else { - err = fmt.Errorf("Can't subscribe to non-existent track %s", request.TrackID) - } + sub, err = subscription.NewAudioSubscription( + published.OutputTrack, + participant.Peer, + ) } if err != nil { @@ -261,19 +267,3 @@ func (t *Tracker) ProcessRTP(info common.TrackInfo, simulcast common.SimulcastLa } } } - -// Processes RTCP packets received on a given track. -func (t *Tracker) ProcessKeyFrameRequest(info common.TrackInfo, simulcast common.SimulcastLayer) error { - published, found := t.publishedTracks[info.TrackID] - if !found { - return fmt.Errorf("no such track: %s", info.TrackID) - } - - participant := t.GetParticipant(published.Owner) - if participant == nil { - return fmt.Errorf("no such participant: %s", published.Owner) - } - - // We don't want to send keyframes too often, so we'll send them only once in a while. - return participant.Peer.WritePLI(info, simulcast) -} diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index cca8c9f..13b49ee 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -147,12 +147,6 @@ func (c *Conference) processDataChannelAvailableMessage(sender participant.ID, m }) } -func (c *Conference) processKeyFrameRequest(msg peer.KeyFrameRequestReceived) { - if err := c.tracker.ProcessKeyFrameRequest(msg.TrackInfo, msg.SimulcastLayer); err != nil { - c.logger.Errorf("Failed to process RTCP on %s (%s): %s", msg.TrackID, msg.SimulcastLayer, err) - } -} - // Handle the `FocusEvent` from the DataChannel message. func (c *Conference) processTrackSubscriptionMessage( p *participant.Participant, diff --git a/pkg/conference/processing.go b/pkg/conference/processing.go index 0b8a27c..ec6f650 100644 --- a/pkg/conference/processing.go +++ b/pkg/conference/processing.go @@ -62,8 +62,6 @@ func (c *Conference) processPeerMessage(message common.Message[participant.ID, p c.processDataChannelMessage(message.Sender, msg) case peer.DataChannelAvailable: c.processDataChannelAvailableMessage(message.Sender, msg) - case peer.KeyFrameRequestReceived: - c.processKeyFrameRequest(msg) default: c.logger.Errorf("Unknown message type: %T", msg) } diff --git a/pkg/peer/messages.go b/pkg/peer/messages.go index 2064c49..d8a7aac 100644 --- a/pkg/peer/messages.go +++ b/pkg/peer/messages.go @@ -54,8 +54,3 @@ type DataChannelMessage struct { } type DataChannelAvailable struct{} - -type KeyFrameRequestReceived struct { - common.TrackInfo - SimulcastLayer common.SimulcastLayer -} diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index d8efc51..948cf82 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -83,8 +83,8 @@ func (p *Peer[ID]) Terminate() { p.sink.Seal() } -// Writes the specified packets to the `trackID`. -func (p *Peer[ID]) WritePLI(info common.TrackInfo, simulcast common.SimulcastLayer) error { +// Request a key frame from the peer connection. +func (p *Peer[ID]) RequestKeyFrame(info common.TrackInfo, simulcast common.SimulcastLayer) error { // Find the right track. track := p.state.GetRemoteTrack(info.TrackID, simulcast) if track == nil { @@ -171,7 +171,3 @@ func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription, return &answer, nil } - -func (p *Peer[ID]) RequestKeyFrame(info common.TrackInfo, simulcast common.SimulcastLayer) error { - return p.sink.TrySend(KeyFrameRequestReceived{info, simulcast}) -} From 8d0a14e4f8e2097ce8eb2b12782df64957f208dd Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Tue, 31 Jan 2023 22:22:23 +0100 Subject: [PATCH 02/17] message_sink: clarify the meaning of `sealed` --- pkg/common/message_sink.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/common/message_sink.go b/pkg/common/message_sink.go index 6853032..ad62110 100644 --- a/pkg/common/message_sink.go +++ b/pkg/common/message_sink.go @@ -15,11 +15,10 @@ type MessageSink[SenderType comparable, MessageType any] struct { sender SenderType // The message sink to which the messages are sent. messageSink chan<- Message[SenderType, MessageType] - // Atomic variable that indicates whether the message sink is sealed. - // Basically it means that **the current sender** (but not other senders) - // won't be able to send any more messages to the message sink. The difference - // between this and the channel being closed is that the closed channel is not - // available for writing for all senders. + // A variable that indicates whether the messages could be sent. It's akin + // to a close indication without really closing the channel. We don't want to close + // the channel here since we know that the sink is shared between multiple producers, + // so we only disallow sending to the sink at this point. sealed atomic.Bool } From 4386f4d68b021e8fbde857ebc99520416f245944 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Tue, 31 Jan 2023 22:30:50 +0100 Subject: [PATCH 03/17] conference: remove a buffer in peer messages sink --- pkg/conference/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 381cebf..9fc53db 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -48,7 +48,7 @@ func StartConference( tracker: *participant.NewParticipantTracker(), streamsMetadata: make(event.CallSDPStreamMetadata), endNotifier: conferenceEndNotifier, - peerMessages: make(chan common.Message[participant.ID, peer.MessageContent], common.UnboundedChannelSize), + peerMessages: make(chan common.Message[participant.ID, peer.MessageContent]), matrixMessages: receiver, } From 11d83f6bd530b1158e66173f90fdc2aa9d39e29f Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 06:30:32 +0100 Subject: [PATCH 04/17] message_sink: refactor the usage of seal logic --- pkg/common/{message_sink.go => sink.go} | 56 ++++++++------------- pkg/conference/matrix_message_processing.go | 2 +- pkg/peer/peer.go | 4 +- 3 files changed, 25 insertions(+), 37 deletions(-) rename pkg/common/{message_sink.go => sink.go} (58%) diff --git a/pkg/common/message_sink.go b/pkg/common/sink.go similarity index 58% rename from pkg/common/message_sink.go rename to pkg/common/sink.go index ad62110..710f8d6 100644 --- a/pkg/common/message_sink.go +++ b/pkg/common/sink.go @@ -2,64 +2,47 @@ package common import ( "errors" - "sync/atomic" ) -// MessageSink is a helper struct that allows to send messages to a message sink. -// The MessageSink abstracts the message sink which has a certain sender, so that +var ErrSinkSealed = errors.New("The channel is sealed") + +// SinkWithSender is a helper struct that allows to send messages to a message sink. +// The SinkWithSender abstracts the message sink which has a certain sender, so that // the sender does not have to be specified every time a message is sent. // At the same it guarantees that the caller can't alter the `sender`, which means that // the sender can't impersonate another sender (and we guarantee this on a compile-time). -type MessageSink[SenderType comparable, MessageType any] struct { +type SinkWithSender[SenderType comparable, MessageType any] struct { // The sender of the messages. This is useful for multiple-producer-single-consumer scenarios. sender SenderType // The message sink to which the messages are sent. messageSink chan<- Message[SenderType, MessageType] - // A variable that indicates whether the messages could be sent. It's akin + // A channel that is used to indicate that our channel is considered sealed. It's akin // to a close indication without really closing the channel. We don't want to close // the channel here since we know that the sink is shared between multiple producers, // so we only disallow sending to the sink at this point. - sealed atomic.Bool + sealed chan struct{} } // Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases. -func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] { - return &MessageSink[S, M]{ +func NewSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *SinkWithSender[S, M] { + return &SinkWithSender[S, M]{ sender: sender, messageSink: messageSink, + sealed: make(chan struct{}), } } // Sends a message to the message sink. Blocks if the sink is full! -func (s *MessageSink[S, M]) Send(message M) error { - return s.send(message, false) -} - -// Sends a message to the message sink. Does **not** block if the sink is full, returns an error instead. -func (s *MessageSink[S, M]) TrySend(message M) error { - return s.send(message, true) -} - -// Sends a message to the message sink. -func (s *MessageSink[S, M]) send(message M, nonBlocking bool) error { - if s.sealed.Load() { - return errors.New("The channel is sealed, you can't send any messages over it") - } - +func (s *SinkWithSender[S, M]) Send(message M) error { messageWithSender := Message[S, M]{ Sender: s.sender, Content: message, } - if nonBlocking { - select { - case s.messageSink <- messageWithSender: - return nil - default: - return errors.New("The channel is full, can't send without blocking") - } - } else { - s.messageSink <- messageWithSender + select { + case <-s.sealed: + return ErrSinkSealed + case s.messageSink <- messageWithSender: return nil } } @@ -68,8 +51,13 @@ func (s *MessageSink[S, M]) send(message M, nonBlocking bool) error { // Any attempt to send a message would result in an error. This is similar to closing the // channel except that we don't close the underlying channel (since there might be other // senders that may want to use it). -func (s *MessageSink[S, M]) Seal() { - s.sealed.Store(true) +func (s *SinkWithSender[S, M]) Seal() { + select { + case <-s.sealed: + return + default: + close(s.sealed) + } } // Messages that are sent from the peer to the conference in order to communicate with other peers. diff --git a/pkg/conference/matrix_message_processing.go b/pkg/conference/matrix_message_processing.go index a1d548d..da1ec24 100644 --- a/pkg/conference/matrix_message_processing.go +++ b/pkg/conference/matrix_message_processing.go @@ -48,7 +48,7 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call } sdpAnswer = answer } else { - messageSink := common.NewMessageSink(id, c.peerMessages) + messageSink := common.NewSink(id, c.peerMessages) peerConnection, answer, err := peer.NewPeer(c.connectionFactory, inviteEvent.Offer.SDP, messageSink, logger) if err != nil { diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index 948cf82..a17b196 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -31,7 +31,7 @@ var ( type Peer[ID comparable] struct { logger *logrus.Entry peerConnection *webrtc.PeerConnection - sink *common.MessageSink[ID, MessageContent] + sink *common.SinkWithSender[ID, MessageContent] state *state.PeerState } @@ -39,7 +39,7 @@ type Peer[ID comparable] struct { func NewPeer[ID comparable]( connectionFactory *webrtc_ext.PeerConnectionFactory, sdpOffer string, - sink *common.MessageSink[ID, MessageContent], + sink *common.SinkWithSender[ID, MessageContent], logger *logrus.Entry, ) (*Peer[ID], *webrtc.SessionDescription, error) { peerConnection, err := connectionFactory.CreatePeerConnection() From 36b3fc7ccc5b95b71038bd3ccf6316cd95fbea7d Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 06:50:58 +0100 Subject: [PATCH 05/17] subscription: reduce the buffer on worker This solves the problem with managing the queue size on the receiver. --- pkg/conference/participant/tracker.go | 8 ++------ pkg/peer/subscription/video.go | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 619e7f6..49733a0 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -255,14 +255,10 @@ func (t *Tracker) Unsubscribe(participantID ID, tracks []TrackID) { // Processes an RTP packet received on a given track. func (t *Tracker) ProcessRTP(info common.TrackInfo, simulcast common.SimulcastLayer, packet *rtp.Packet) { - for participantID, subscription := range t.subscribers[info.TrackID] { + for _, subscription := range t.subscribers[info.TrackID] { if subscription.Simulcast() == simulcast { if err := subscription.WriteRTP(*packet); err != nil { - if participant := t.GetParticipant(participantID); participant != nil { - participant.Logger.Errorf("Error writing RTP to %s (%s): %s", info.TrackID, simulcast, err) - continue - } - logrus.Errorf("Bug: subscription without subscriber") + logrus.Errorf("Dropping an RTP packet on %s (%s): %s", info.TrackID, simulcast, err) } } } diff --git a/pkg/peer/subscription/video.go b/pkg/peer/subscription/video.go index babb02a..9738d63 100644 --- a/pkg/peer/subscription/video.go +++ b/pkg/peer/subscription/video.go @@ -70,7 +70,7 @@ func NewVideoSubscription( // Configure the worker for the subscription. workerConfig := common.WorkerConfig[rtp.Packet]{ - ChannelSize: 100, // Approx. 500ms of buffer size, we don't need more + ChannelSize: 1, // We really don't want to buffer old packets. Timeout: 2 * time.Second, OnTimeout: func() { layer := common.SimulcastLayer(subscription.currentLayer.Load()) From 5376a7a383df4c7b169d522bc8946e7c9e3d7060 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 06:53:02 +0100 Subject: [PATCH 06/17] router: remove the buffer on incoming channel It's ok if the matrix SDK blocks until we're ready to accept new messages. --- pkg/routing/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/routing/router.go b/pkg/routing/router.go index e568ff2..da1f3f5 100644 --- a/pkg/routing/router.go +++ b/pkg/routing/router.go @@ -53,7 +53,7 @@ func NewRouter( matrix: matrix, conferenceSinks: make(map[string]*common.Sender[conf.MatrixMessage]), config: config, - channel: make(chan RouterMessage, common.UnboundedChannelSize), + channel: make(chan RouterMessage), connectionFactory: connectionFactory, } From b03eb1a016226d44b33a2709fc4e2c64c9f8c943 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 07:46:41 +0100 Subject: [PATCH 07/17] heartbeat: refactor heartbeat channel usage --- pkg/common/worker_test.go | 2 +- pkg/conference/matrix_message_processing.go | 24 +++++++++---------- .../participant}/heartbeat.go | 12 +++++----- pkg/conference/participant/participant.go | 3 +-- pkg/conference/participant/tracker.go | 2 +- pkg/conference/peer_message_processing.go | 9 ++++--- 6 files changed, 24 insertions(+), 28 deletions(-) rename pkg/{common => conference/participant}/heartbeat.go (85%) diff --git a/pkg/common/worker_test.go b/pkg/common/worker_test.go index a765d41..57b10bd 100644 --- a/pkg/common/worker_test.go +++ b/pkg/common/worker_test.go @@ -9,7 +9,7 @@ import ( func BenchmarkWorker(b *testing.B) { workerConfig := common.WorkerConfig[struct{}]{ - ChannelSize: common.UnboundedChannelSize, + ChannelSize: 1, Timeout: 2 * time.Second, OnTimeout: func() {}, OnTask: func(struct{}) {}, diff --git a/pkg/conference/matrix_message_processing.go b/pkg/conference/matrix_message_processing.go index da1ec24..428b03c 100644 --- a/pkg/conference/matrix_message_processing.go +++ b/pkg/conference/matrix_message_processing.go @@ -56,18 +56,16 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call return err } - heartbeat := common.Heartbeat{ - Interval: time.Duration(c.config.HeartbeatConfig.Interval) * time.Second, - Timeout: time.Duration(c.config.HeartbeatConfig.Timeout) * time.Second, - SendPing: func() bool { - return p.SendDataChannelMessage(event.Event{ - Type: event.FocusCallPing, - Content: event.Content{}, - }) == nil - }, - OnTimeout: func() { - messageSink.Send(peer.LeftTheCall{event.CallHangupKeepAliveTimeout}) - }, + pingEvent := event.Event{ + Type: event.FocusCallPing, + Content: event.Content{}, + } + + heartbeat := participant.HeartbeatConfig{ + Interval: time.Duration(c.config.HeartbeatConfig.Interval) * time.Second, + Timeout: time.Duration(c.config.HeartbeatConfig.Timeout) * time.Second, + SendPing: func() bool { return p.SendDataChannelMessage(pingEvent) == nil }, + OnTimeout: func() { messageSink.Send(peer.LeftTheCall{event.CallHangupKeepAliveTimeout}) }, } p = &participant.Participant{ @@ -75,7 +73,7 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call Peer: peerConnection, Logger: logger, RemoteSessionID: inviteEvent.SenderSessionID, - HeartbeatPong: heartbeat.Start(), + Pong: heartbeat.Start(), } c.tracker.AddParticipant(p) diff --git a/pkg/common/heartbeat.go b/pkg/conference/participant/heartbeat.go similarity index 85% rename from pkg/common/heartbeat.go rename to pkg/conference/participant/heartbeat.go index f2f3146..165aaae 100644 --- a/pkg/common/heartbeat.go +++ b/pkg/conference/participant/heartbeat.go @@ -1,4 +1,4 @@ -package common +package participant import ( "time" @@ -6,8 +6,8 @@ import ( type Pong struct{} -// Heartbeat defines the configuration for a heartbeat. -type Heartbeat struct { +// HeartbeatConfig defines the configuration for a heartbeat. +type HeartbeatConfig struct { // How often to send pings. Interval time.Duration // After which time to consider the communication stalled. @@ -23,8 +23,8 @@ type Heartbeat struct { // on `PongChannel` for `Timeout`. If no response is received within `Timeout`, `OnTimeout` is called. // The goroutine stops once the channel is closed or upon handling the `OnTimeout`. The returned channel // is what the caller should use to inform about the reception of a pong. -func (h *Heartbeat) Start() chan<- Pong { - pong := make(chan Pong, UnboundedChannelSize) +func (h *HeartbeatConfig) Start() chan<- Pong { + pong := make(chan Pong, 1) go func() { ticker := time.NewTicker(h.Interval) @@ -52,7 +52,7 @@ func (h *Heartbeat) Start() chan<- Pong { // Tries to send a ping message using `SendPing` and retry it if it fails. // Returns `true` if the ping was sent successfully. -func (h *Heartbeat) sendWithRetry() bool { +func (h *HeartbeatConfig) sendWithRetry() bool { const retries = 3 retryInterval := h.Timeout / retries diff --git a/pkg/conference/participant/participant.go b/pkg/conference/participant/participant.go index 808f978..0532ff5 100644 --- a/pkg/conference/participant/participant.go +++ b/pkg/conference/participant/participant.go @@ -3,7 +3,6 @@ package participant import ( "fmt" - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" "github.com/sirupsen/logrus" @@ -25,7 +24,7 @@ type Participant struct { Logger *logrus.Entry Peer *peer.Peer[ID] RemoteSessionID id.SessionID - HeartbeatPong chan<- common.Pong + Pong chan<- Pong } func (p *Participant) AsMatrixRecipient() signaling.MatrixRecipient { diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 49733a0..1c26779 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -61,7 +61,7 @@ func (t *Tracker) RemoveParticipant(participantID ID) map[string]bool { // Terminate the participant and remove it from the list. participant.Peer.Terminate() - close(participant.HeartbeatPong) + close(participant.Pong) delete(t.participants, participantID) // Remove the participant's tracks from all participants who might have subscribed to them. diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index 13b49ee..22ccaed 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -1,7 +1,6 @@ package conference import ( - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" @@ -243,10 +242,10 @@ func (c *Conference) processNegotiateMessage(p *participant.Participant, msg eve } func (c *Conference) processPongMessage(p *participant.Participant) { - // New heartbeat received (keep-alive message that is periodically sent by the remote peer). - // We need to update the last heartbeat time. If the peer is not active for too long, we will - // consider peer's connection as stalled and will close it. - p.HeartbeatPong <- common.Pong{} + select { + case p.Pong <- participant.Pong{}: + default: + } } func (c *Conference) processMetadataMessage( From 948657547640e110d4f6779e93a1bb47ed2942bc Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 07:51:37 +0100 Subject: [PATCH 08/17] channel: get rid of last `UnboundedChannelSize` --- pkg/common/channel.go | 11 ++--------- pkg/conference/start.go | 2 +- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/pkg/common/channel.go b/pkg/common/channel.go index 4be62af..c30932f 100644 --- a/pkg/common/channel.go +++ b/pkg/common/channel.go @@ -2,18 +2,11 @@ package common import "sync/atomic" -// In Go, unbounded channel means something different than what it means in Rust. -// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**, -// meaning that each attempt to send will block the channel until the receiver -// reads it. Majority of primitives here in `waterfall` are designed under assumption -// that sending is not blocking. -const UnboundedChannelSize = 512 - // Creates a new channel, returns two counterparts of it where one can only send and another can only receive. // Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail // to send any messages to the channel over `Send“. -func NewChannel[M any]() (Sender[M], Receiver[M]) { - channel := make(chan M, UnboundedChannelSize) +func NewChannel[M any](channelSize int) (Sender[M], Receiver[M]) { + channel := make(chan M, channelSize) closed := &atomic.Bool{} sender := Sender[M]{channel, closed} receiver := Receiver[M]{channel, closed} diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 9fc53db..b09bd9e 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -37,7 +37,7 @@ func StartConference( userID id.UserID, inviteEvent *event.CallInviteEventContent, ) (*common.Sender[MatrixMessage], error) { - sender, receiver := common.NewChannel[MatrixMessage]() + sender, receiver := common.NewChannel[MatrixMessage](128) conference := &Conference{ id: confID, From 93153238390d1cd40f96b12f2c142fc434d83c45 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 1 Feb 2023 08:06:40 +0100 Subject: [PATCH 09/17] router: refactor the router in a more idiomatic Go Use unbounded channels for that. We can't quite use it for the multi-conference set ups as a single conference will affect others, but currently we don't have these problems. --- cmd/sfu/main.go | 11 +- pkg/.DS_Store | Bin 0 -> 6148 bytes pkg/common/channel.go | 63 ----------- pkg/conference/matrix_message_processing.go | 5 +- pkg/conference/processing.go | 16 +-- pkg/conference/start.go | 21 ++-- pkg/conference/state.go | 12 +- pkg/routing/router.go | 118 +++++++------------- 8 files changed, 69 insertions(+), 177 deletions(-) create mode 100644 pkg/.DS_Store delete mode 100644 pkg/common/channel.go diff --git a/cmd/sfu/main.go b/cmd/sfu/main.go index 2a5d41f..17deaff 100644 --- a/cmd/sfu/main.go +++ b/cmd/sfu/main.go @@ -100,11 +100,16 @@ func main() { return } - // Create a router to route incoming To-Device messages to the right conference. - routerChannel := routing.NewRouter(matrixClient, connectionFactory, config.Conference) + // Create a channel which we'll use to send events to the router. + matrixEvents := make(chan *event.Event) + + // Start a router that will receive events from the matrix client and route them to the appropriate conference. + routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference) // Start matrix client sync. This function will block until the sync fails. matrixClient.RunSyncing(func(e *event.Event) { - routerChannel <- e + matrixEvents <- e }) + + close(matrixEvents) } diff --git a/pkg/.DS_Store b/pkg/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..b2d281837a908135dc600c4b82085dea6eb45751 GIT binary patch literal 6148 zcmeHK%}T>S5Z-NTe<(r@3OxqA7Hmt!f|n5M3mDOZN=;1BV7gnH)Er77XMG``#OHBl zcXJ2^youPEvir@>&u->}?hj*(yX$bsn8g@tLqp`KR0x_YT{RPo$kiN@i6BdsK`euc ziTBn2Zinj z*U>ziSqJA*q<$pkE0qw1a|pS+i9{$fSFS~vs9aAwIv_Kvx7nNw#(jG<9B%veW-b2!bB#C) S+EqFrT?7;%)DZ)}z`z%ONlZ2X literal 0 HcmV?d00001 diff --git a/pkg/common/channel.go b/pkg/common/channel.go deleted file mode 100644 index c30932f..0000000 --- a/pkg/common/channel.go +++ /dev/null @@ -1,63 +0,0 @@ -package common - -import "sync/atomic" - -// Creates a new channel, returns two counterparts of it where one can only send and another can only receive. -// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail -// to send any messages to the channel over `Send“. -func NewChannel[M any](channelSize int) (Sender[M], Receiver[M]) { - channel := make(chan M, channelSize) - closed := &atomic.Bool{} - sender := Sender[M]{channel, closed} - receiver := Receiver[M]{channel, closed} - return sender, receiver -} - -// Sender counterpart of the channel. -type Sender[M any] struct { - // The channel itself. - channel chan<- M - // Atomic variable that indicates whether the channel is closed. - receiverClosed *atomic.Bool -} - -// Tries to send a message if the channel is not closed. -// Returns the message back if the channel is closed. -func (s *Sender[M]) Send(message M) *M { - if !s.receiverClosed.Load() { - s.channel <- message - return nil - } else { - return &message - } -} - -// The receiver counterpart of the channel. -type Receiver[M any] struct { - // The channel itself. It's public, so that we can combine it in `select` statements. - Channel <-chan M - // Atomic variable that indicates whether the channel is closed. - receiverClosed *atomic.Bool -} - -// Marks the channel as closed, which means that no messages could be sent via this channel. -// Any attempt to send a message would result in an error. This is similar to closing the -// channel except that we don't close the underlying channel (since in Go receivers can't -// close the channel). -// -// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise, -// they will stay forver in a channel and get lost. -func (r *Receiver[M]) Close() []M { - r.receiverClosed.Store(true) - - messages := make([]M, 0) - for { - msg, ok := <-r.Channel - if !ok { - break - } - messages = append(messages, msg) - } - - return messages -} diff --git a/pkg/conference/matrix_message_processing.go b/pkg/conference/matrix_message_processing.go index 428b03c..7a23124 100644 --- a/pkg/conference/matrix_message_processing.go +++ b/pkg/conference/matrix_message_processing.go @@ -15,9 +15,8 @@ import ( type MessageContent interface{} type MatrixMessage struct { - Sender participant.ID - Content MessageContent - RawEvent *event.Event + Sender participant.ID + Content MessageContent } // New participant tries to join the conference. diff --git a/pkg/conference/processing.go b/pkg/conference/processing.go index ec6f650..a089af4 100644 --- a/pkg/conference/processing.go +++ b/pkg/conference/processing.go @@ -11,27 +11,21 @@ import ( // This is essentially the main loop of the conference. // If this function returns, the conference is over. func (c *Conference) processMessages() { + // When the main loop of the conference ends, clean up the resources. + defer close(c.conferenceDone) + defer c.matrixWorker.stop() + for { select { case msg := <-c.peerMessages: c.processPeerMessage(msg) - case msg := <-c.matrixMessages.Channel: + case msg := <-c.matrixEvents: c.processMatrixMessage(msg) } // If there are no more participants, stop the conference. if !c.tracker.HasParticipants() { c.logger.Info("No more participants, stopping the conference") - // Close the channel so that the sender can't push any messages. - unreadMessages := c.matrixMessages.Close() - - // Send the information that we ended to the owner and pass the message - // that we did not process (so that we don't drop it silently). - c.endNotifier.Notify(unreadMessages) - - // Stop the matrix worker. - c.matrixWorker.stop() - return } } diff --git a/pkg/conference/start.go b/pkg/conference/start.go index b09bd9e..1c9de50 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -28,17 +28,17 @@ import ( ) // Starts a new conference or fails and returns an error. +// The conference ends when the last participant leaves. func StartConference( confID string, config Config, peerConnectionFactory *webrtc_ext.PeerConnectionFactory, signaling signaling.MatrixSignaler, - conferenceEndNotifier ConferenceEndNotifier, + matrixEvents <-chan MatrixMessage, userID id.UserID, inviteEvent *event.CallInviteEventContent, -) (*common.Sender[MatrixMessage], error) { - sender, receiver := common.NewChannel[MatrixMessage](128) - +) (<-chan struct{}, error) { + done := make(chan struct{}) conference := &Conference{ id: confID, config: config, @@ -47,23 +47,18 @@ func StartConference( matrixWorker: newMatrixWorker(signaling), tracker: *participant.NewParticipantTracker(), streamsMetadata: make(event.CallSDPStreamMetadata), - endNotifier: conferenceEndNotifier, peerMessages: make(chan common.Message[participant.ID, peer.MessageContent]), - matrixMessages: receiver, + matrixEvents: matrixEvents, + conferenceDone: done, } participantID := participant.ID{UserID: userID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID} if err := conference.onNewParticipant(participantID, inviteEvent); err != nil { - return nil, err + return nil, nil } // Start conference "main loop". go conference.processMessages() - return &sender, nil -} - -type ConferenceEndNotifier interface { - // Called when the conference ends. - Notify(unread []MatrixMessage) + return done, nil } diff --git a/pkg/conference/state.go b/pkg/conference/state.go index 04a0fed..c40f9c0 100644 --- a/pkg/conference/state.go +++ b/pkg/conference/state.go @@ -11,10 +11,10 @@ import ( // A single conference. Call and conference mean the same in context of Matrix. type Conference struct { - id string - config Config - logger *logrus.Entry - endNotifier ConferenceEndNotifier + id string + config Config + logger *logrus.Entry + conferenceDone chan<- struct{} connectionFactory *webrtc_ext.PeerConnectionFactory matrixWorker *matrixWorker @@ -22,8 +22,8 @@ type Conference struct { tracker participant.Tracker streamsMetadata event.CallSDPStreamMetadata - peerMessages chan common.Message[participant.ID, peer.MessageContent] - matrixMessages common.Receiver[MatrixMessage] + peerMessages chan common.Message[participant.ID, peer.MessageContent] + matrixEvents <-chan MatrixMessage } func (c *Conference) getParticipant(id participant.ID) *participant.Participant { diff --git a/pkg/routing/router.go b/pkg/routing/router.go index da1f3f5..b3535b2 100644 --- a/pkg/routing/router.go +++ b/pkg/routing/router.go @@ -17,7 +17,6 @@ limitations under the License. package routing import ( - "github.com/matrix-org/waterfall/pkg/common" conf "github.com/matrix-org/waterfall/pkg/conference" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/signaling" @@ -27,58 +26,43 @@ import ( "maunium.net/go/mautrix/id" ) -type Conference = common.Sender[conf.MatrixMessage] - // The top-level state of the Router. type Router struct { // Matrix matrix. matrix *signaling.MatrixClient // Sinks of all conferences (all calls that are currently forwarded by this SFU). - conferenceSinks map[string]*Conference + conferenceSinks map[string]*conferenceStage // Configuration for the calls. config conf.Config - // A channel to serialize all incoming events to the Router. - channel chan RouterMessage + // Channel for reading incoming Matrix SDK To-Device events and distributing them to the conferences. + matrixEvents <-chan *event.Event + // Channel for handling conference ended events. // Peer connection factory that can be used to create pre-configured peer connections. connectionFactory *webrtc_ext.PeerConnectionFactory } // Creates a new instance of the SFU with the given configuration. -func NewRouter( +func RunRouter( matrix *signaling.MatrixClient, connectionFactory *webrtc_ext.PeerConnectionFactory, + matrixEvents <-chan *event.Event, config conf.Config, -) chan<- RouterMessage { +) { router := &Router{ matrix: matrix, - conferenceSinks: make(map[string]*common.Sender[conf.MatrixMessage]), + conferenceSinks: make(map[string]*conferenceStage), config: config, - channel: make(chan RouterMessage), + matrixEvents: matrixEvents, connectionFactory: connectionFactory, } // Start the main loop of the Router. go func() { - for msg := range router.channel { - switch msg := msg.(type) { + for msg := range router.matrixEvents { // To-Device message received from the remote peer. - case MatrixMessage: - router.handleMatrixEvent(msg) - // One of the conferences has ended. - case ConferenceEndedMessage: - // Remove the conference that ended from the list. - delete(router.conferenceSinks, msg.conferenceID) - - // Process the message that was not read by the conference. - for _, msg := range msg.unread { - // TODO: We actually already know the type, so we can do this better. - router.handleMatrixEvent(msg.RawEvent) - } - } + router.handleMatrixEvent(msg) } }() - - return router.channel } // Handles incoming To-Device events that the SFU receives from clients. @@ -120,12 +104,15 @@ func (r *Router) handleMatrixEvent(evt *event.Event) { // are expected to operate on an existing conference that is running on the SFU. if conference == nil && evt.Type.Type == event.ToDeviceCallInvite.Type { logger.Infof("creating new conference %s", conferenceID) - conferenceSink, err := conf.StartConference( + + matrixEvents := make(chan conf.MatrixMessage) + + conferenceDone, err := conf.StartConference( conferenceID, r.config, r.connectionFactory, r.matrix.CreateForConference(conferenceID), - createConferenceEndNotifier(conferenceID, r.channel), + matrixEvents, userID, evt.Content.AsCallInvite(), ) @@ -134,7 +121,7 @@ func (r *Router) handleMatrixEvent(evt *event.Event) { return } - r.conferenceSinks[conferenceID] = conferenceSink + r.conferenceSinks[conferenceID] = &conferenceStage{matrixEvents, conferenceDone} return } @@ -144,70 +131,45 @@ func (r *Router) handleMatrixEvent(evt *event.Event) { return } - // A helper function to deal with messages that can't be sent due to the conference closed. - // Not a function due to the need to capture local environment. - sendToConference := func(eventContent conf.MessageContent) { - sender := participant.ID{userID, id.DeviceID(deviceID), callID} - // At this point the conference is not nil. - // Let's check if the channel is still available. - if conference.Send(conf.MatrixMessage{Content: eventContent, RawEvent: evt, Sender: sender}) != nil { - // If sending failed, then the conference is over. - delete(r.conferenceSinks, conferenceID) - // Since we were not able to send the message, let's re-process it now. - // Note, we probably do not want to block here! - r.handleMatrixEvent(evt) - } - } + // Sender of the To-Device message. + sender := participant.ID{userID, id.DeviceID(deviceID), callID} + var content conf.MessageContent switch evt.Type.Type { // Someone tries to participate in a call (join a call). case event.ToDeviceCallInvite.Type: // If there is an invitation sent and the conference does not exist, create one. - sendToConference(evt.Content.AsCallInvite()) + content = evt.Content.AsCallInvite() case event.ToDeviceCallCandidates.Type: // Someone tries to send ICE candidates to the existing call. - sendToConference(evt.Content.AsCallCandidates()) + content = evt.Content.AsCallCandidates() case event.ToDeviceCallSelectAnswer.Type: // Someone informs us about them accepting our (SFU's) SDP answer for an existing call. - sendToConference(evt.Content.AsCallSelectAnswer()) + content = evt.Content.AsCallSelectAnswer() case event.ToDeviceCallHangup.Type: // Someone tries to inform us about leaving an existing call. - sendToConference(evt.Content.AsCallHangup()) + content = evt.Content.AsCallHangup() default: logger.Warnf("ignoring event that we must not receive: %s", evt.Type.Type) + return } -} -type RouterMessage = interface{} - -type MatrixMessage = *event.Event - -// Message that is sent from the conference when the conference is ended. -type ConferenceEndedMessage struct { - // The ID of the conference that has ended. - conferenceID string - // A message (or messages in future) that has not been processed (if any). - unread []conf.MatrixMessage -} - -// A simple wrapper around channel that contains the ID of the conference that sent the message. -type ConferenceEndNotifier struct { - conferenceID string - channel chan<- interface{} -} - -// Crates a simple notifier with a conference with a given ID. -func createConferenceEndNotifier(conferenceID string, channel chan<- RouterMessage) *ConferenceEndNotifier { - return &ConferenceEndNotifier{ - conferenceID: conferenceID, - channel: channel, + // Send the message to the conference. + select { + case <-conference.done: + // Conference has just gotten closed, let's remove it from the list of conferences. + delete(r.conferenceSinks, conferenceID) + close(conference.sink) + + // Since we were not able to send the message, let's re-process it now. + r.handleMatrixEvent(evt) + case conference.sink <- conf.MatrixMessage{Content: content, Sender: sender}: + // Ok,sent! + return } } -// A function that a conference calls when it is ended. -func (c *ConferenceEndNotifier) Notify(unread []conf.MatrixMessage) { - c.channel <- ConferenceEndedMessage{ - conferenceID: c.conferenceID, - unread: unread, - } +type conferenceStage struct { + sink chan<- conf.MatrixMessage + done <-chan struct{} } From 4ce8b07fb403a09aaa961bf3227bfe833a0dc9b6 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 2 Feb 2023 21:54:27 +0100 Subject: [PATCH 10/17] track: move track info out of `common` Usage of `common` is an anti-pattern. --- pkg/conference/participant/track.go | 32 ++++++++++++------------ pkg/conference/participant/track_test.go | 18 ++++++------- pkg/conference/participant/tracker.go | 22 ++++++++-------- pkg/peer/messages.go | 14 +++++------ pkg/peer/peer.go | 2 +- pkg/peer/remote_track.go | 16 ++++++------ pkg/peer/state/peer_state.go | 10 ++++---- pkg/peer/subscription/audio.go | 8 +++--- pkg/peer/subscription/subscription.go | 6 ++--- pkg/peer/subscription/video.go | 29 ++++++++++----------- pkg/peer/webrtc_callbacks.go | 4 +-- pkg/{common => webrtc_ext}/track_info.go | 2 +- 12 files changed, 81 insertions(+), 82 deletions(-) rename pkg/{common => webrtc_ext}/track_info.go (98%) diff --git a/pkg/conference/participant/track.go b/pkg/conference/participant/track.go index dad4cd8..bd908c4 100644 --- a/pkg/conference/participant/track.go +++ b/pkg/conference/participant/track.go @@ -1,7 +1,7 @@ package participant import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/webrtc/v3" "golang.org/x/exp/slices" ) @@ -13,9 +13,9 @@ type PublishedTrack struct { // Owner of a published track. Owner ID // Info about the track. - Info common.TrackInfo + Info webrtc_ext.TrackInfo // Available simulcast Layers. - Layers []common.SimulcastLayer + Layers []webrtc_ext.SimulcastLayer // Track metadata. Metadata TrackMetadata // Output track (if any). I.e. a track that would contain all RTP packets @@ -24,11 +24,11 @@ type PublishedTrack struct { } // Calculate the layer that we can use based on the requirements passed as parameters and available layers. -func (p *PublishedTrack) GetOptimalLayer(requestedWidth, requestedHeight int) common.SimulcastLayer { +func (p *PublishedTrack) GetOptimalLayer(requestedWidth, requestedHeight int) webrtc_ext.SimulcastLayer { // Audio track. For them we don't have any simulcast. We also don't have any simulcast for video // if there was no simulcast enabled at all. if p.Info.Kind == webrtc.RTPCodecTypeAudio || len(p.Layers) == 0 { - return common.SimulcastLayerNone + return webrtc_ext.SimulcastLayerNone } // Video track. Calculate the optimal layer closest to the requested resolution. @@ -36,16 +36,16 @@ func (p *PublishedTrack) GetOptimalLayer(requestedWidth, requestedHeight int) co // Ideally, here we would need to send an error if the desired layer is not available, but we don't // have a way to do it. So we just return the closest available layer. - priority := []common.SimulcastLayer{ + priority := []webrtc_ext.SimulcastLayer{ desiredLayer, - common.SimulcastLayerMedium, - common.SimulcastLayerLow, - common.SimulcastLayerHigh, + webrtc_ext.SimulcastLayerMedium, + webrtc_ext.SimulcastLayerLow, + webrtc_ext.SimulcastLayerHigh, } // More Go boilerplate. for _, desiredLayer := range priority { - layerIndex := slices.IndexFunc(p.Layers, func(simulcast common.SimulcastLayer) bool { + layerIndex := slices.IndexFunc(p.Layers, func(simulcast webrtc_ext.SimulcastLayer) bool { return simulcast == desiredLayer }) @@ -56,7 +56,7 @@ func (p *PublishedTrack) GetOptimalLayer(requestedWidth, requestedHeight int) co // Actually this part will never be executed, because if we got to this point, // we know that we at least have one layer available. - return common.SimulcastLayerLow + return webrtc_ext.SimulcastLayerLow } // Metadata that we have received about this track from a user. @@ -69,21 +69,21 @@ type TrackMetadata struct { // maximum resolution that we can get from the user. We assume that a medium quality layer is half the size of // the video (**but not half of the resolution**). I.e. medium quality is high quality divided by 4. And low // quality is medium quality divided by 4 (which is the same as the high quality dividied by 16). -func calculateDesiredLayer(fullWidth, fullHeight int, desiredWidth, desiredHeight int) common.SimulcastLayer { +func calculateDesiredLayer(fullWidth, fullHeight int, desiredWidth, desiredHeight int) webrtc_ext.SimulcastLayer { // Calculate combined length of width and height for the full and desired size videos. fullSize := fullWidth + fullHeight desiredSize := desiredWidth + desiredHeight if fullSize == 0 || desiredSize == 0 { - return common.SimulcastLayerLow + return webrtc_ext.SimulcastLayerLow } // Determine which simulcast desiredLayer to subscribe to based on the requested resolution. if ratio := float32(fullSize) / float32(desiredSize); ratio <= 1 { - return common.SimulcastLayerHigh + return webrtc_ext.SimulcastLayerHigh } else if ratio <= 2 { - return common.SimulcastLayerMedium + return webrtc_ext.SimulcastLayerMedium } - return common.SimulcastLayerLow + return webrtc_ext.SimulcastLayerLow } diff --git a/pkg/conference/participant/track_test.go b/pkg/conference/participant/track_test.go index 612db43..3d8f9a1 100644 --- a/pkg/conference/participant/track_test.go +++ b/pkg/conference/participant/track_test.go @@ -3,25 +3,25 @@ package participant_test import ( "testing" - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/conference/participant" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/webrtc/v3" ) func TestGetOptimalLayer(t *testing.T) { // Helper function for a quick an descriptive test case definition. - layers := func(layers ...common.SimulcastLayer) []common.SimulcastLayer { + layers := func(layers ...webrtc_ext.SimulcastLayer) []webrtc_ext.SimulcastLayer { return layers } // Shortcuts for easy and descriptive test case definition. - low, mid, high := common.SimulcastLayerLow, common.SimulcastLayerMedium, common.SimulcastLayerHigh + low, mid, high := webrtc_ext.SimulcastLayerLow, webrtc_ext.SimulcastLayerMedium, webrtc_ext.SimulcastLayerHigh cases := []struct { - availableLayers []common.SimulcastLayer + availableLayers []webrtc_ext.SimulcastLayer fullWidth, fullHeight int desiredWidth, desiredHeight int - expectedOptimalLayer common.SimulcastLayer + expectedOptimalLayer webrtc_ext.SimulcastLayer }{ {layers(low, mid, high), 1728, 1056, 878, 799, mid}, // Screen sharing (Dave's case). {layers(low, mid, high), 1920, 1080, 320, 240, low}, // max=1080p, desired=240p, result=low. @@ -44,7 +44,7 @@ func TestGetOptimalLayer(t *testing.T) { } mock := participant.PublishedTrack{ - Info: common.TrackInfo{ + Info: webrtc_ext.TrackInfo{ Kind: webrtc.RTPCodecTypeVideo, }, } @@ -63,13 +63,13 @@ func TestGetOptimalLayer(t *testing.T) { func TestGetOptimalLayerAudio(t *testing.T) { mock := participant.PublishedTrack{ - Info: common.TrackInfo{ + Info: webrtc_ext.TrackInfo{ Kind: webrtc.RTPCodecTypeAudio, }, } - mock.Layers = []common.SimulcastLayer{common.SimulcastLayerLow} - if mock.GetOptimalLayer(100, 100) != common.SimulcastLayerNone { + mock.Layers = []webrtc_ext.SimulcastLayer{webrtc_ext.SimulcastLayerLow} + if mock.GetOptimalLayer(100, 100) != webrtc_ext.SimulcastLayerNone { t.Fatal("Expected no simulcast layer for audio") } } diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 1c26779..0a3788e 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -1,8 +1,8 @@ package participant import ( - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/peer/subscription" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" @@ -91,16 +91,16 @@ func (t *Tracker) RemoveParticipant(participantID ID) map[string]bool { // that has been published and that we must take into account from now on. func (t *Tracker) AddPublishedTrack( participantID ID, - info common.TrackInfo, - simulcast common.SimulcastLayer, + info webrtc_ext.TrackInfo, + simulcast webrtc_ext.SimulcastLayer, metadata TrackMetadata, outputTrack *webrtc.TrackLocalStaticRTP, ) { // If this is a new track, let's add it to the list of published and inform participants. track, found := t.publishedTracks[info.TrackID] if !found { - layers := []common.SimulcastLayer{} - if simulcast != common.SimulcastLayerNone { + layers := []webrtc_ext.SimulcastLayer{} + if simulcast != webrtc_ext.SimulcastLayerNone { layers = append(layers, simulcast) } @@ -116,8 +116,8 @@ func (t *Tracker) AddPublishedTrack( } // If it's just a new layer, let's add it to the list of layers of the existing published track. - fn := func(layer common.SimulcastLayer) bool { return layer == simulcast } - if simulcast != common.SimulcastLayerNone && slices.IndexFunc(track.Layers, fn) == -1 { + fn := func(layer webrtc_ext.SimulcastLayer) bool { return layer == simulcast } + if simulcast != webrtc_ext.SimulcastLayerNone && slices.IndexFunc(track.Layers, fn) == -1 { track.Layers = append(track.Layers, simulcast) t.publishedTracks[info.TrackID] = track } @@ -162,8 +162,8 @@ func (t *Tracker) RemovePublishedTrack(id TrackID) { } type SubscribeRequest struct { - common.TrackInfo - Simulcast common.SimulcastLayer + webrtc_ext.TrackInfo + Simulcast webrtc_ext.SimulcastLayer } // Subscribes a given participant to the tracks that are passed as a parameter. @@ -197,7 +197,7 @@ func (t *Tracker) Subscribe(participantID ID, requests []SubscribeRequest) { request.TrackInfo, request.Simulcast, participant.Peer, - func(track common.TrackInfo, simulcast common.SimulcastLayer) error { + func(track webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer) error { return owner.Peer.RequestKeyFrame(track, simulcast) }, participant.Logger, @@ -254,7 +254,7 @@ func (t *Tracker) Unsubscribe(participantID ID, tracks []TrackID) { } // Processes an RTP packet received on a given track. -func (t *Tracker) ProcessRTP(info common.TrackInfo, simulcast common.SimulcastLayer, packet *rtp.Packet) { +func (t *Tracker) ProcessRTP(info webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer, packet *rtp.Packet) { for _, subscription := range t.subscribers[info.TrackID] { if subscription.Simulcast() == simulcast { if err := subscription.WriteRTP(*packet); err != nil { diff --git a/pkg/peer/messages.go b/pkg/peer/messages.go index d8a7aac..4f67a6d 100644 --- a/pkg/peer/messages.go +++ b/pkg/peer/messages.go @@ -1,7 +1,7 @@ package peer import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtp" "github.com/pion/webrtc/v3" "maunium.net/go/mautrix/event" @@ -19,9 +19,9 @@ type LeftTheCall struct { type NewTrackPublished struct { // Information about the track (ID etc). - common.TrackInfo + webrtc_ext.TrackInfo // SimulcastLayer configuration (can be `None` for non-simulcast tracks and for audio tracks). - SimulcastLayer common.SimulcastLayer + SimulcastLayer webrtc_ext.SimulcastLayer // Output track (if any) that could be used to send data to the peer. Will be `nil` if such // track does not exist, in which case the caller is expected to listen to `RtpPacketReceived` // messages. @@ -29,13 +29,13 @@ type NewTrackPublished struct { } type PublishedTrackFailed struct { - common.TrackInfo - SimulcastLayer common.SimulcastLayer + webrtc_ext.TrackInfo + SimulcastLayer webrtc_ext.SimulcastLayer } type RTPPacketReceived struct { - common.TrackInfo - SimulcastLayer common.SimulcastLayer + webrtc_ext.TrackInfo + SimulcastLayer webrtc_ext.SimulcastLayer Packet *rtp.Packet } diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index a17b196..637bbc0 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -84,7 +84,7 @@ func (p *Peer[ID]) Terminate() { } // Request a key frame from the peer connection. -func (p *Peer[ID]) RequestKeyFrame(info common.TrackInfo, simulcast common.SimulcastLayer) error { +func (p *Peer[ID]) RequestKeyFrame(info webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer) error { // Find the right track. track := p.state.GetRemoteTrack(info.TrackID, simulcast) if track == nil { diff --git a/pkg/peer/remote_track.go b/pkg/peer/remote_track.go index f86792a..97a82ae 100644 --- a/pkg/peer/remote_track.go +++ b/pkg/peer/remote_track.go @@ -4,19 +4,17 @@ import ( "errors" "io" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtp" "github.com/pion/webrtc/v3" ) func (p *Peer[ID]) handleNewVideoTrack( - trackInfo common.TrackInfo, + trackInfo webrtc_ext.TrackInfo, remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver, ) { - p.logger.Infof("ontrack got video track %s", trackInfo.TrackID) - - simulcast := common.RIDToSimulcastLayer(remoteTrack.RID()) + simulcast := webrtc_ext.RIDToSimulcastLayer(remoteTrack.RID()) p.handleRemoteTrack(remoteTrack, trackInfo, simulcast, nil, func(packet *rtp.Packet) error { p.sink.Send(RTPPacketReceived{trackInfo, simulcast, packet}) @@ -25,7 +23,7 @@ func (p *Peer[ID]) handleNewVideoTrack( } func (p *Peer[ID]) handleNewAudioTrack( - trackInfo common.TrackInfo, + trackInfo webrtc_ext.TrackInfo, remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver, ) { @@ -41,7 +39,7 @@ func (p *Peer[ID]) handleNewAudioTrack( return } - p.handleRemoteTrack(remoteTrack, trackInfo, common.SimulcastLayerNone, localTrack, func(packet *rtp.Packet) error { + p.handleRemoteTrack(remoteTrack, trackInfo, webrtc_ext.SimulcastLayerNone, localTrack, func(packet *rtp.Packet) error { if err = localTrack.WriteRTP(packet); err != nil && !errors.Is(err, io.ErrClosedPipe) { return err } @@ -51,8 +49,8 @@ func (p *Peer[ID]) handleNewAudioTrack( func (p *Peer[ID]) handleRemoteTrack( remoteTrack *webrtc.TrackRemote, - trackInfo common.TrackInfo, - simulcast common.SimulcastLayer, + trackInfo webrtc_ext.TrackInfo, + simulcast webrtc_ext.SimulcastLayer, outputTrack *webrtc.TrackLocalStaticRTP, handleRtpFn func(*rtp.Packet) error, ) { diff --git a/pkg/peer/state/peer_state.go b/pkg/peer/state/peer_state.go index 45eac7d..b277139 100644 --- a/pkg/peer/state/peer_state.go +++ b/pkg/peer/state/peer_state.go @@ -3,13 +3,13 @@ package state import ( "sync" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/webrtc/v3" ) type RemoteTrackId struct { id string - simulcast common.SimulcastLayer + simulcast webrtc_ext.SimulcastLayer } type PeerState struct { @@ -28,17 +28,17 @@ func (p *PeerState) AddRemoteTrack(track *webrtc.TrackRemote) { p.mutex.Lock() defer p.mutex.Unlock() - p.remoteTracks[RemoteTrackId{track.ID(), common.RIDToSimulcastLayer(track.RID())}] = track + p.remoteTracks[RemoteTrackId{track.ID(), webrtc_ext.RIDToSimulcastLayer(track.RID())}] = track } func (p *PeerState) RemoveRemoteTrack(track *webrtc.TrackRemote) { p.mutex.Lock() defer p.mutex.Unlock() - delete(p.remoteTracks, RemoteTrackId{track.ID(), common.RIDToSimulcastLayer(track.RID())}) + delete(p.remoteTracks, RemoteTrackId{track.ID(), webrtc_ext.RIDToSimulcastLayer(track.RID())}) } -func (p *PeerState) GetRemoteTrack(id string, simulcast common.SimulcastLayer) *webrtc.TrackRemote { +func (p *PeerState) GetRemoteTrack(id string, simulcast webrtc_ext.SimulcastLayer) *webrtc.TrackRemote { p.mutex.Lock() defer p.mutex.Unlock() diff --git a/pkg/peer/subscription/audio.go b/pkg/peer/subscription/audio.go index 7e99ede..c1b40c3 100644 --- a/pkg/peer/subscription/audio.go +++ b/pkg/peer/subscription/audio.go @@ -5,7 +5,7 @@ import ( "fmt" "io" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtp" "github.com/pion/webrtc/v3" ) @@ -42,11 +42,11 @@ func (s *AudioSubscription) WriteRTP(packet rtp.Packet) error { return fmt.Errorf("Bug: no write RTP logic for an audio subscription!") } -func (s *AudioSubscription) SwitchLayer(simulcast common.SimulcastLayer) { +func (s *AudioSubscription) SwitchLayer(simulcast webrtc_ext.SimulcastLayer) { } -func (s *AudioSubscription) Simulcast() common.SimulcastLayer { - return common.SimulcastLayerNone +func (s *AudioSubscription) Simulcast() webrtc_ext.SimulcastLayer { + return webrtc_ext.SimulcastLayerNone } func (s *AudioSubscription) readRTCP() { diff --git a/pkg/peer/subscription/subscription.go b/pkg/peer/subscription/subscription.go index deb8a04..bf6f6b4 100644 --- a/pkg/peer/subscription/subscription.go +++ b/pkg/peer/subscription/subscription.go @@ -1,7 +1,7 @@ package subscription import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtp" "github.com/pion/webrtc/v3" ) @@ -9,8 +9,8 @@ import ( type Subscription interface { Unsubscribe() error WriteRTP(packet rtp.Packet) error - SwitchLayer(simulcast common.SimulcastLayer) - Simulcast() common.SimulcastLayer + SwitchLayer(simulcast webrtc_ext.SimulcastLayer) + Simulcast() webrtc_ext.SimulcastLayer } type SubscriptionController interface { diff --git a/pkg/peer/subscription/video.go b/pkg/peer/subscription/video.go index 9738d63..3d8012c 100644 --- a/pkg/peer/subscription/video.go +++ b/pkg/peer/subscription/video.go @@ -9,19 +9,20 @@ import ( "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/peer/subscription/rewriter" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) -type RequestKeyFrameFn = func(track common.TrackInfo, simulcast common.SimulcastLayer) error +type RequestKeyFrameFn = func(track webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer) error type VideoSubscription struct { rtpSender *webrtc.RTPSender - info common.TrackInfo - currentLayer atomic.Int32 // atomic common.SimulcastLayer + info webrtc_ext.TrackInfo + currentLayer atomic.Int32 // atomic webrtc_ext.SimulcastLayer controller SubscriptionController requestKeyFrameFn RequestKeyFrameFn @@ -30,8 +31,8 @@ type VideoSubscription struct { } func NewVideoSubscription( - info common.TrackInfo, - simulcast common.SimulcastLayer, + info webrtc_ext.TrackInfo, + simulcast webrtc_ext.SimulcastLayer, controller SubscriptionController, requestKeyFrameFn RequestKeyFrameFn, logger *logrus.Entry, @@ -47,7 +48,7 @@ func NewVideoSubscription( return nil, fmt.Errorf("Failed to add track: %s", err) } - // Atomic version of the common.SimulcastLayer. + // Atomic version of the webrtc_ext.SimulcastLayer. var currentLayer atomic.Int32 currentLayer.Store(int32(simulcast)) @@ -73,7 +74,7 @@ func NewVideoSubscription( ChannelSize: 1, // We really don't want to buffer old packets. Timeout: 2 * time.Second, OnTimeout: func() { - layer := common.SimulcastLayer(subscription.currentLayer.Load()) + layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) logger.Warnf("No RTP on subscription %s (%s)", subscription.info.TrackID, layer) subscription.requestKeyFrame() }, @@ -94,7 +95,7 @@ func NewVideoSubscription( func (s *VideoSubscription) Unsubscribe() error { s.worker.Stop() - s.logger.Infof("Unsubscribing from %s (%s)", s.info.TrackID, common.SimulcastLayer(s.currentLayer.Load())) + s.logger.Infof("Unsubscribing from %s (%s)", s.info.TrackID, webrtc_ext.SimulcastLayer(s.currentLayer.Load())) return s.controller.RemoveTrack(s.rtpSender) } @@ -103,18 +104,18 @@ func (s *VideoSubscription) WriteRTP(packet rtp.Packet) error { return s.worker.Send(packet) } -func (s *VideoSubscription) SwitchLayer(simulcast common.SimulcastLayer) { +func (s *VideoSubscription) SwitchLayer(simulcast webrtc_ext.SimulcastLayer) { s.logger.Infof("Switching layer on %s to %s", s.info.TrackID, simulcast) s.currentLayer.Store(int32(simulcast)) s.requestKeyFrame() } -func (s *VideoSubscription) TrackInfo() common.TrackInfo { +func (s *VideoSubscription) TrackInfo() webrtc_ext.TrackInfo { return s.info } -func (s *VideoSubscription) Simulcast() common.SimulcastLayer { - return common.SimulcastLayer(s.currentLayer.Load()) +func (s *VideoSubscription) Simulcast() webrtc_ext.SimulcastLayer { + return webrtc_ext.SimulcastLayer(s.currentLayer.Load()) } // Read incoming RTCP packets. Before these packets are returned they are processed by interceptors. @@ -123,7 +124,7 @@ func (s *VideoSubscription) readRTCP() { packets, _, err := s.rtpSender.ReadRTCP() if err != nil { if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) { - layer := common.SimulcastLayer(s.currentLayer.Load()) + layer := webrtc_ext.SimulcastLayer(s.currentLayer.Load()) s.logger.Warnf("failed to read RTCP on track: %s (%s): %s", s.info.TrackID, layer, err) s.worker.Stop() return @@ -142,7 +143,7 @@ func (s *VideoSubscription) readRTCP() { } func (s *VideoSubscription) requestKeyFrame() { - layer := common.SimulcastLayer(s.currentLayer.Load()) + layer := webrtc_ext.SimulcastLayer(s.currentLayer.Load()) if err := s.requestKeyFrameFn(s.info, layer); err != nil { s.logger.Errorf("Failed to request key frame: %s", err) } diff --git a/pkg/peer/webrtc_callbacks.go b/pkg/peer/webrtc_callbacks.go index ff8e54c..31288b8 100644 --- a/pkg/peer/webrtc_callbacks.go +++ b/pkg/peer/webrtc_callbacks.go @@ -1,7 +1,7 @@ package peer import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/webrtc/v3" "maunium.net/go/mautrix/event" ) @@ -10,7 +10,7 @@ import ( // we call this function each time a new track is received. func (p *Peer[ID]) onRtpTrackReceived(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { // Construct a new track info assuming that there is no simulcast. - trackInfo := common.TrackInfoFromTrack(remoteTrack) + trackInfo := webrtc_ext.TrackInfoFromTrack(remoteTrack) switch trackInfo.Kind { case webrtc.RTPCodecTypeVideo: diff --git a/pkg/common/track_info.go b/pkg/webrtc_ext/track_info.go similarity index 98% rename from pkg/common/track_info.go rename to pkg/webrtc_ext/track_info.go index c00c123..03b1e60 100644 --- a/pkg/common/track_info.go +++ b/pkg/webrtc_ext/track_info.go @@ -1,4 +1,4 @@ -package common +package webrtc_ext import ( "github.com/pion/webrtc/v3" From 974f469215ba637f158ae71ef03be0f02aa2c681 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 2 Feb 2023 21:58:34 +0100 Subject: [PATCH 11/17] worker: move to its own package Package `common` is an antipattern. --- pkg/conference/matrix_worker.go | 8 ++++---- pkg/peer/subscription/video.go | 8 ++++---- pkg/{common => worker}/worker.go | 6 +++--- pkg/{common => worker}/worker_test.go | 8 ++++---- 4 files changed, 15 insertions(+), 15 deletions(-) rename pkg/{common => worker}/worker.go (95%) rename pkg/{common => worker}/worker_test.go (61%) diff --git a/pkg/conference/matrix_worker.go b/pkg/conference/matrix_worker.go index b6d4b70..b6f0b37 100644 --- a/pkg/conference/matrix_worker.go +++ b/pkg/conference/matrix_worker.go @@ -3,19 +3,19 @@ package conference import ( "time" - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/signaling" + "github.com/matrix-org/waterfall/pkg/worker" "github.com/sirupsen/logrus" "maunium.net/go/mautrix/id" ) type matrixWorker struct { - worker *common.Worker[signaling.MatrixMessage] + worker *worker.Worker[signaling.MatrixMessage] deviceID id.DeviceID } func newMatrixWorker(handler signaling.MatrixSignaler) *matrixWorker { - workerConfig := common.WorkerConfig[signaling.MatrixMessage]{ + workerConfig := worker.Config[signaling.MatrixMessage]{ ChannelSize: 128, Timeout: time.Hour, OnTimeout: func() {}, @@ -23,7 +23,7 @@ func newMatrixWorker(handler signaling.MatrixSignaler) *matrixWorker { } matrixWorker := &matrixWorker{ - worker: common.StartWorker(workerConfig), + worker: worker.StartWorker(workerConfig), deviceID: handler.DeviceID(), } diff --git a/pkg/peer/subscription/video.go b/pkg/peer/subscription/video.go index 3d8012c..58cbdd9 100644 --- a/pkg/peer/subscription/video.go +++ b/pkg/peer/subscription/video.go @@ -7,9 +7,9 @@ import ( "sync/atomic" "time" - "github.com/matrix-org/waterfall/pkg/common" "github.com/matrix-org/waterfall/pkg/peer/subscription/rewriter" "github.com/matrix-org/waterfall/pkg/webrtc_ext" + "github.com/matrix-org/waterfall/pkg/worker" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v3" @@ -26,7 +26,7 @@ type VideoSubscription struct { controller SubscriptionController requestKeyFrameFn RequestKeyFrameFn - worker *common.Worker[rtp.Packet] + worker *worker.Worker[rtp.Packet] logger *logrus.Entry } @@ -70,7 +70,7 @@ func NewVideoSubscription( } // Configure the worker for the subscription. - workerConfig := common.WorkerConfig[rtp.Packet]{ + workerConfig := worker.Config[rtp.Packet]{ ChannelSize: 1, // We really don't want to buffer old packets. Timeout: 2 * time.Second, OnTimeout: func() { @@ -82,7 +82,7 @@ func NewVideoSubscription( } // Start a worker for the subscription and create a subsription. - subscription.worker = common.StartWorker(workerConfig) + subscription.worker = worker.StartWorker(workerConfig) // Start reading and forwarding RTCP packets. go subscription.readRTCP() diff --git a/pkg/common/worker.go b/pkg/worker/worker.go similarity index 95% rename from pkg/common/worker.go rename to pkg/worker/worker.go index 81fbadc..3133457 100644 --- a/pkg/common/worker.go +++ b/pkg/worker/worker.go @@ -1,4 +1,4 @@ -package common +package worker import ( "errors" @@ -13,7 +13,7 @@ var ( ) // Configuration for the worker. -type WorkerConfig[T any] struct { +type Config[T any] struct { // The size of the bounded channel. ChannelSize int // Timeout after which `OnTimeout` is called. @@ -68,7 +68,7 @@ func (c *Worker[T]) Send(task T) error { // Starts a worker that periodically (specified by the configuration) executes a `c.OnTimeout` closure if // no tasks have been received on a channel for a `c.Timeout`. The worker will stop once the channel is closed, // i.e. once the user calls `Stop` explicitly. -func StartWorker[T any](c WorkerConfig[T]) *Worker[T] { +func StartWorker[T any](c Config[T]) *Worker[T] { // The channel that will be used to inform the worker about the reception of a task. // The worker will be stopped once the channel is closed. incoming := make(chan T, c.ChannelSize) diff --git a/pkg/common/worker_test.go b/pkg/worker/worker_test.go similarity index 61% rename from pkg/common/worker_test.go rename to pkg/worker/worker_test.go index 57b10bd..7e90eb7 100644 --- a/pkg/common/worker_test.go +++ b/pkg/worker/worker_test.go @@ -1,20 +1,20 @@ -package common_test +package worker_test import ( "testing" "time" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/worker" ) func BenchmarkWorker(b *testing.B) { - workerConfig := common.WorkerConfig[struct{}]{ + workerConfig := worker.Config[struct{}]{ ChannelSize: 1, Timeout: 2 * time.Second, OnTimeout: func() {}, OnTask: func(struct{}) {}, } - w := common.StartWorker(workerConfig) + w := worker.StartWorker(workerConfig) for n := 0; n < b.N; n++ { w.Send(struct{}{}) From 20b5085039a5d5d6d2f26f50ed3e666320c507f5 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 2 Feb 2023 22:02:22 +0100 Subject: [PATCH 12/17] sink: move to the `channel` module The `common` package is an antipattern. --- pkg/{common => channel}/sink.go | 2 +- pkg/conference/matrix_message_processing.go | 4 ++-- pkg/conference/processing.go | 4 ++-- pkg/conference/start.go | 4 ++-- pkg/conference/state.go | 4 ++-- pkg/peer/peer.go | 6 +++--- 6 files changed, 12 insertions(+), 12 deletions(-) rename pkg/{common => channel}/sink.go (99%) diff --git a/pkg/common/sink.go b/pkg/channel/sink.go similarity index 99% rename from pkg/common/sink.go rename to pkg/channel/sink.go index 710f8d6..1943fcd 100644 --- a/pkg/common/sink.go +++ b/pkg/channel/sink.go @@ -1,4 +1,4 @@ -package common +package channel import ( "errors" diff --git a/pkg/conference/matrix_message_processing.go b/pkg/conference/matrix_message_processing.go index 7a23124..8931301 100644 --- a/pkg/conference/matrix_message_processing.go +++ b/pkg/conference/matrix_message_processing.go @@ -3,7 +3,7 @@ package conference import ( "time" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" @@ -47,7 +47,7 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call } sdpAnswer = answer } else { - messageSink := common.NewSink(id, c.peerMessages) + messageSink := channel.NewSink(id, c.peerMessages) peerConnection, answer, err := peer.NewPeer(c.connectionFactory, inviteEvent.Offer.SDP, messageSink, logger) if err != nil { diff --git a/pkg/conference/processing.go b/pkg/conference/processing.go index a089af4..3f64aa2 100644 --- a/pkg/conference/processing.go +++ b/pkg/conference/processing.go @@ -1,7 +1,7 @@ package conference import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "maunium.net/go/mautrix/event" @@ -32,7 +32,7 @@ func (c *Conference) processMessages() { } // Process a message from a local peer. -func (c *Conference) processPeerMessage(message common.Message[participant.ID, peer.MessageContent]) { +func (c *Conference) processPeerMessage(message channel.Message[participant.ID, peer.MessageContent]) { // Since Go does not support ADTs, we have to use a switch statement to // determine the actual type of the message. switch msg := message.Content.(type) { diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 1c9de50..0dd992d 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -17,7 +17,7 @@ limitations under the License. package conference import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" @@ -47,7 +47,7 @@ func StartConference( matrixWorker: newMatrixWorker(signaling), tracker: *participant.NewParticipantTracker(), streamsMetadata: make(event.CallSDPStreamMetadata), - peerMessages: make(chan common.Message[participant.ID, peer.MessageContent]), + peerMessages: make(chan channel.Message[participant.ID, peer.MessageContent]), matrixEvents: matrixEvents, conferenceDone: done, } diff --git a/pkg/conference/state.go b/pkg/conference/state.go index c40f9c0..7fae3d7 100644 --- a/pkg/conference/state.go +++ b/pkg/conference/state.go @@ -1,7 +1,7 @@ package conference import ( - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/webrtc_ext" @@ -22,7 +22,7 @@ type Conference struct { tracker participant.Tracker streamsMetadata event.CallSDPStreamMetadata - peerMessages chan common.Message[participant.ID, peer.MessageContent] + peerMessages chan channel.Message[participant.ID, peer.MessageContent] matrixEvents <-chan MatrixMessage } diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index 637bbc0..0b20650 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - "github.com/matrix-org/waterfall/pkg/common" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/peer/state" "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/rtcp" @@ -31,7 +31,7 @@ var ( type Peer[ID comparable] struct { logger *logrus.Entry peerConnection *webrtc.PeerConnection - sink *common.SinkWithSender[ID, MessageContent] + sink *channel.SinkWithSender[ID, MessageContent] state *state.PeerState } @@ -39,7 +39,7 @@ type Peer[ID comparable] struct { func NewPeer[ID comparable]( connectionFactory *webrtc_ext.PeerConnectionFactory, sdpOffer string, - sink *common.SinkWithSender[ID, MessageContent], + sink *channel.SinkWithSender[ID, MessageContent], logger *logrus.Entry, ) (*Peer[ID], *webrtc.SessionDescription, error) { peerConnection, err := connectionFactory.CreatePeerConnection() From 94d0b6245e7d152827b9112dbc4d739ed9aeb8f1 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 2 Feb 2023 22:19:01 +0100 Subject: [PATCH 13/17] sink: handle an important edge-case --- pkg/channel/sink.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/channel/sink.go b/pkg/channel/sink.go index 1943fcd..5d9bffa 100644 --- a/pkg/channel/sink.go +++ b/pkg/channel/sink.go @@ -2,6 +2,7 @@ package channel import ( "errors" + "sync/atomic" ) var ErrSinkSealed = errors.New("The channel is sealed") @@ -21,9 +22,12 @@ type SinkWithSender[SenderType comparable, MessageType any] struct { // the channel here since we know that the sink is shared between multiple producers, // so we only disallow sending to the sink at this point. sealed chan struct{} + // A "mutex" that is used to protect the act of closing `sealed`. + alreadySealed atomic.Bool } // Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases. +// Note that since the current implementation accepts a channel, it's **not responsible** for closing it. func NewSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *SinkWithSender[S, M] { return &SinkWithSender[S, M]{ sender: sender, @@ -34,6 +38,10 @@ func NewSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *S // Sends a message to the message sink. Blocks if the sink is full! func (s *SinkWithSender[S, M]) Send(message M) error { + if s.alreadySealed.Load() { + return ErrSinkSealed + } + messageWithSender := Message[S, M]{ Sender: s.sender, Content: message, @@ -48,10 +56,17 @@ func (s *SinkWithSender[S, M]) Send(message M) error { } // Seals the channel, which means that no messages could be sent via this channel. -// Any attempt to send a message would result in an error. This is similar to closing the -// channel except that we don't close the underlying channel (since there might be other -// senders that may want to use it). +// Any attempt to send a message after `Seal()` returns will result in an error. +// Note that it does not mean (does not guarantee) that any existing senders that are +// waiting on the send to unblock won't send the message to the recipient (this case +// can happen if buffered channels are used). The existing senders will either unblock +// at this point and get an error that the channel is sealed or will unblock by sending +// the message to the recipient (should the recipient be ready to consume at this point). func (s *SinkWithSender[S, M]) Seal() { + if !s.alreadySealed.CompareAndSwap(false, true) { + return + } + select { case <-s.sealed: return From 4ae95a8fa7494093344f443be72bf1224fe3d277 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Mon, 6 Feb 2023 15:32:33 +0100 Subject: [PATCH 14/17] conference: get back buffers This is a temporary measure until we can workaround bugs in Pion that lead to the deadlocks. --- pkg/conference/start.go | 2 +- pkg/peer/subscription/video.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 0dd992d..3ac57bd 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -47,7 +47,7 @@ func StartConference( matrixWorker: newMatrixWorker(signaling), tracker: *participant.NewParticipantTracker(), streamsMetadata: make(event.CallSDPStreamMetadata), - peerMessages: make(chan channel.Message[participant.ID, peer.MessageContent]), + peerMessages: make(chan channel.Message[participant.ID, peer.MessageContent], 100), matrixEvents: matrixEvents, conferenceDone: done, } diff --git a/pkg/peer/subscription/video.go b/pkg/peer/subscription/video.go index 58cbdd9..ea945c4 100644 --- a/pkg/peer/subscription/video.go +++ b/pkg/peer/subscription/video.go @@ -71,8 +71,8 @@ func NewVideoSubscription( // Configure the worker for the subscription. workerConfig := worker.Config[rtp.Packet]{ - ChannelSize: 1, // We really don't want to buffer old packets. - Timeout: 2 * time.Second, + ChannelSize: 32, + Timeout: 3 * time.Second, OnTimeout: func() { layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) logger.Warnf("No RTP on subscription %s (%s)", subscription.info.TrackID, layer) From eff418b53994a1bfe7dc31350baeed1bcc60c37e Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 8 Feb 2023 19:19:51 +0100 Subject: [PATCH 15/17] router: rename `RunRouter` to `StartRouter` So it better reflects the semantics (that the function does not block). --- cmd/sfu/main.go | 2 +- pkg/routing/router.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/sfu/main.go b/cmd/sfu/main.go index 17deaff..1fda598 100644 --- a/cmd/sfu/main.go +++ b/cmd/sfu/main.go @@ -104,7 +104,7 @@ func main() { matrixEvents := make(chan *event.Event) // Start a router that will receive events from the matrix client and route them to the appropriate conference. - routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference) + routing.StartRouter(matrixClient, connectionFactory, matrixEvents, config.Conference) // Start matrix client sync. This function will block until the sync fails. matrixClient.RunSyncing(func(e *event.Event) { diff --git a/pkg/routing/router.go b/pkg/routing/router.go index b3535b2..6c81f43 100644 --- a/pkg/routing/router.go +++ b/pkg/routing/router.go @@ -42,7 +42,7 @@ type Router struct { } // Creates a new instance of the SFU with the given configuration. -func RunRouter( +func StartRouter( matrix *signaling.MatrixClient, connectionFactory *webrtc_ext.PeerConnectionFactory, matrixEvents <-chan *event.Event, From b40d6adfe934e3fe83373c5af106ad0ec1cdb909 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 8 Feb 2023 19:24:25 +0100 Subject: [PATCH 16/17] signaling: don't panic on errors inside a package --- cmd/sfu/main.go | 10 ++++++---- pkg/signaling/client.go | 12 ++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/sfu/main.go b/cmd/sfu/main.go index 1fda598..823ab15 100644 --- a/cmd/sfu/main.go +++ b/cmd/sfu/main.go @@ -102,14 +102,16 @@ func main() { // Create a channel which we'll use to send events to the router. matrixEvents := make(chan *event.Event) + defer close(matrixEvents) // Start a router that will receive events from the matrix client and route them to the appropriate conference. routing.StartRouter(matrixClient, connectionFactory, matrixEvents, config.Conference) // Start matrix client sync. This function will block until the sync fails. - matrixClient.RunSyncing(func(e *event.Event) { - matrixEvents <- e - }) + if err := matrixClient.RunSync(func(e *event.Event) { matrixEvents <- e }); err != nil { + logrus.WithError(err).Fatal("matrix client sync failed") + return + } - close(matrixEvents) + logrus.Info("SFU stopped") } diff --git a/pkg/signaling/client.go b/pkg/signaling/client.go index 044d496..47f4a5e 100644 --- a/pkg/signaling/client.go +++ b/pkg/signaling/client.go @@ -1,6 +1,8 @@ package signaling import ( + "fmt" + "github.com/sirupsen/logrus" "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" @@ -34,11 +36,11 @@ func NewMatrixClient(config Config) *MatrixClient { } // Starts the Matrix client and connects to the homeserver, -// Returns only when the sync with Matrix fails. -func (m *MatrixClient) RunSyncing(callback func(*event.Event)) { +// Returns only when the sync with Matrix stops or fails. +func (m *MatrixClient) RunSync(callback func(*event.Event)) error { syncer, ok := m.client.Syncer.(*mautrix.DefaultSyncer) if !ok { - logrus.Panic("Syncer is not DefaultSyncer") + return fmt.Errorf("syncer is not a DefaultSyncer") } syncer.ParseEventContent = true @@ -61,7 +63,5 @@ func (m *MatrixClient) RunSyncing(callback func(*event.Event)) { // TODO: We may want to reconnect if `Sync()` fails instead of ending the SFU // as ending here will essentially drop all conferences which may not necessarily // be what we want for the existing running conferences. - if err := m.client.Sync(); err != nil { - logrus.WithError(err).Panic("Sync failed") - } + return m.client.Sync() } From 7c4ea8f41f480860b4d171d53ba96f0f50631037 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Wed, 8 Feb 2023 19:27:02 +0100 Subject: [PATCH 17/17] conference: don't store `done` channel in state --- pkg/conference/processing.go | 4 ++-- pkg/conference/start.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/conference/processing.go b/pkg/conference/processing.go index 3f64aa2..eecfc26 100644 --- a/pkg/conference/processing.go +++ b/pkg/conference/processing.go @@ -10,9 +10,9 @@ import ( // Listen on messages from incoming channels and process them. // This is essentially the main loop of the conference. // If this function returns, the conference is over. -func (c *Conference) processMessages() { +func (c *Conference) processMessages(signalDone chan struct{}) { // When the main loop of the conference ends, clean up the resources. - defer close(c.conferenceDone) + defer close(signalDone) defer c.matrixWorker.stop() for { diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 3ac57bd..cc749c3 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -38,7 +38,6 @@ func StartConference( userID id.UserID, inviteEvent *event.CallInviteEventContent, ) (<-chan struct{}, error) { - done := make(chan struct{}) conference := &Conference{ id: confID, config: config, @@ -49,7 +48,6 @@ func StartConference( streamsMetadata: make(event.CallSDPStreamMetadata), peerMessages: make(chan channel.Message[participant.ID, peer.MessageContent], 100), matrixEvents: matrixEvents, - conferenceDone: done, } participantID := participant.ID{UserID: userID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID} @@ -58,7 +56,8 @@ func StartConference( } // Start conference "main loop". - go conference.processMessages() + signalDone := make(chan struct{}) + go conference.processMessages(signalDone) - return done, nil + return signalDone, nil }