diff --git a/pkg/conference/participant/participant.go b/pkg/conference/participant/participant.go index 0532ff5..08f13ce 100644 --- a/pkg/conference/participant/participant.go +++ b/pkg/conference/participant/participant.go @@ -1,8 +1,6 @@ package participant import ( - "fmt" - "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" "github.com/sirupsen/logrus" @@ -39,12 +37,13 @@ func (p *Participant) AsMatrixRecipient() signaling.MatrixRecipient { func (p *Participant) SendDataChannelMessage(toSend event.Event) error { jsonToSend, err := toSend.MarshalJSON() if err != nil { - return fmt.Errorf("Failed to marshal data channel message: %w", err) + p.Logger.Errorf("Failed to marshal data channel message: %s", err) + return err } if err := p.Peer.SendOverDataChannel(string(jsonToSend)); err != nil { - // TODO: We must buffer the message in this case and re-send it once the data channel is recovered! - return fmt.Errorf("Failed to send data channel message: %w", err) + p.Logger.Errorf("Failed to send data channel message: %s", err) + return err } return nil diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index c5e259c..c37e52d 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -78,7 +78,9 @@ func (c *Conference) processRenegotiationRequiredMessage(sender participant.ID, return } - p.Logger.Info("Renegotiation started, sending SDP offer") + streamsMetadata := c.getAvailableStreamsFor(p.ID) + p.Logger.Infof("Renegotiating, sending SDP offer (%d streams)", len(streamsMetadata)) + p.SendDataChannelMessage(event.Event{ Type: event.FocusCallNegotiate, Content: event.Content{ @@ -87,7 +89,7 @@ func (c *Conference) processRenegotiationRequiredMessage(sender participant.ID, Type: event.CallDataType(msg.Offer.Type.String()), SDP: msg.Offer.SDP, }, - SDPStreamMetadata: c.getAvailableStreamsFor(p.ID), + SDPStreamMetadata: streamsMetadata, }, }, }) @@ -153,21 +155,16 @@ func (c *Conference) processTrackSubscriptionMessage( // Let's first handle the unsubscribe commands. for _, track := range msg.Unsubscribe { - p.Logger.Debugf("Unsubscribing from track %s", track.TrackID) c.tracker.Unsubscribe(p.ID, track.TrackID) } // Now let's handle the subscribe commands. for _, track := range msg.Subscribe { - p.Logger.Debugf("Subscribing to track %s", track.TrackID) - requirements := published.TrackMetadata{track.Width, track.Height} if err := c.tracker.Subscribe(p.ID, track.TrackID, requirements); err != nil { p.Logger.Errorf("Failed to subscribe to track %s: %v", track.TrackID, err) continue } - - p.Logger.Infof("Subscribed to track %s", track.TrackID) } } diff --git a/pkg/conference/publisher/publisher.go b/pkg/conference/publisher/publisher.go index fdd102f..5c4a5bd 100644 --- a/pkg/conference/publisher/publisher.go +++ b/pkg/conference/publisher/publisher.go @@ -54,7 +54,7 @@ func NewPublisher( return default: if err := publisher.forwardPacket(); err != nil { - log.Errorf("failed to read the frame from the track %s", err) + log.Errorf("track ended: %s", err) return } } diff --git a/pkg/conference/subscription/video.go b/pkg/conference/subscription/video.go index 989583e..8fd97aa 100644 --- a/pkg/conference/subscription/video.go +++ b/pkg/conference/subscription/video.go @@ -126,7 +126,7 @@ func (s *VideoSubscription) readRTCP() { if err != nil { if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) { layer := webrtc_ext.SimulcastLayer(s.currentLayer.Load()) - s.logger.Warnf("failed to read RTCP on track: %s (%s): %s", s.info.TrackID, layer, err) + s.logger.Debugf("failed to read RTCP on track: %s (%s): %s", s.info.TrackID, layer, err) s.worker.Stop() return } diff --git a/pkg/conference/track/track.go b/pkg/conference/track/track.go index 163b21e..fd71d51 100644 --- a/pkg/conference/track/track.go +++ b/pkg/conference/track/track.go @@ -51,7 +51,7 @@ func NewPublishedTrack[SubscriberID comparable]( logger *logrus.Entry, ) (*PublishedTrack[SubscriberID], error) { published := &PublishedTrack[SubscriberID]{ - logger: logger, + logger: logger.WithField("track", track.ID()), info: webrtc_ext.TrackInfoFromTrack(track), owner: trackOwner[SubscriberID]{ownerID, requestKeyFrame}, subscriptions: make(map[SubscriberID]subscription.Subscription), @@ -224,6 +224,8 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( p.video.publishers[layer].AddSubscription(sub) } + p.logger.Info("New subscriber:", subscriberID) + return nil }