diff --git a/config.yaml.sample b/config.yaml.sample index 0b1b4df..e53b2e1 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -1,4 +1,6 @@ -homeserverurl: "http://localhost:8008" -userid: "@sfu:shadowfax" -accesstoken: "..." -timeout: 30 +matrix: + homeserverurl: "http://localhost:8008" + userid: "@sfu:shadowfax" + accesstoken: "..." +conference: + timeout: 30 diff --git a/docker-compose.yaml b/docker-compose.yaml index 90ffc6d..83d75e0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,7 +9,9 @@ services: environment: # Set the `CONFIG` to the configuration you want. CONFIG: | - homeserverurl: "http://localhost:8008" - userid: "@sfu:shadowfax" - accesstoken: "..." - timeout: 30 + matrix: + homeserverurl: "http://localhost:8008" + userid: "@sfu:shadowfax" + accesstoken: "..." + conference: + timeout: 30 diff --git a/src/conference/conference.go b/src/conference/conference.go index 290e7f9..1a2a243 100644 --- a/src/conference/conference.go +++ b/src/conference/conference.go @@ -18,46 +18,33 @@ package conference import ( "github.com/matrix-org/waterfall/src/peer" + "github.com/matrix-org/waterfall/src/signaling" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" ) -// Configuration for the group conferences (calls). -type CallConfig struct { - // Keep-alive timeout for WebRTC connections. If no keep-alive has been received - // from the client for this duration, the connection is considered dead. - KeepAliveTimeout int -} - -type Participant struct { - Peer *peer.Peer - Data *ParticipantData -} - -type ParticipantData struct { - RemoteSessionID id.SessionID - StreamMetadata event.CallSDPStreamMetadata -} - type Conference struct { - conferenceID string - config *CallConfig - participants map[peer.ID]*Participant - participantsChannel peer.MessageChannel - logger *logrus.Entry + id string + config Config + signaling signaling.MatrixSignaling + participants map[peer.ID]*Participant + peerEventsStream chan peer.Message + logger *logrus.Entry } -func NewConference(confID string, config *CallConfig) *Conference { - conference := new(Conference) - conference.config = config - conference.conferenceID = confID - conference.participants = make(map[peer.ID]*Participant) - conference.participantsChannel = make(peer.MessageChannel) - conference.logger = logrus.WithFields(logrus.Fields{ - "conf_id": confID, - }) +func NewConference(confID string, config Config, signaling signaling.MatrixSignaling) *Conference { + conference := &Conference{ + id: confID, + config: config, + signaling: signaling, + participants: make(map[peer.ID]*Participant), + peerEventsStream: make(chan peer.Message), + logger: logrus.WithFields(logrus.Fields{"conf_id": confID}), + } + + // Start conference "main loop". + go conference.processMessages() return conference } @@ -66,41 +53,43 @@ func (c *Conference) OnNewParticipant(participantID peer.ID, inviteEvent *event. // As per MSC3401, when the `session_id` field changes from an incoming `m.call.member` event, // any existing calls from this device in this call should be terminated. // TODO: Implement this. - /* - for _, participant := range c.participants { - if participant.data.DeviceID == inviteEvent.DeviceID { - if participant.data.RemoteSessionID == inviteEvent.SenderSessionID { - c.logger.WithFields(logrus.Fields{ - "device_id": inviteEvent.DeviceID, - "session_id": inviteEvent.SenderSessionID, - }).Errorf("Found existing participant with equal DeviceID and SessionID") - return - } else { - participant.Terminate() - delete(c.participants, participant.data.UserID) - } + for id, participant := range c.participants { + if id.DeviceID == inviteEvent.DeviceID { + if participant.remoteSessionID == inviteEvent.SenderSessionID { + c.logger.WithFields(logrus.Fields{ + "device_id": inviteEvent.DeviceID, + "session_id": inviteEvent.SenderSessionID, + }).Errorf("Found existing participant with equal DeviceID and SessionID") + return + } else { + participant.peer.Terminate() } } - */ + } - peer, _, err := peer.NewPeer(participantID, c.conferenceID, inviteEvent.Offer.SDP, c.participantsChannel) + peer, sdpOffer, err := peer.NewPeer(participantID, c.id, inviteEvent.Offer.SDP, c.peerEventsStream) if err != nil { c.logger.WithError(err).Errorf("Failed to create new peer") return } - participantData := &ParticipantData{ - RemoteSessionID: inviteEvent.SenderSessionID, - StreamMetadata: inviteEvent.SDPStreamMetadata, + participant := &Participant{ + id: participantID, + peer: peer, + remoteSessionID: inviteEvent.SenderSessionID, + streamMetadata: inviteEvent.SDPStreamMetadata, + publishedTracks: make(map[event.SFUTrackDescription]*webrtc.TrackLocalStaticRTP), } - c.participants[participantID] = &Participant{Peer: peer, Data: participantData} + c.participants[participantID] = participant - // TODO: Send the SDP answer back to the participant's device. + recipient := participant.asMatrixRecipient() + streamMetadata := c.getStreamsMetadata(participantID) + c.signaling.SendSDPAnswer(recipient, streamMetadata, sdpOffer.SDP) } func (c *Conference) OnCandidates(peerID peer.ID, candidatesEvent *event.CallCandidatesEventContent) { - if participant := c.getParticipant(peerID); participant != nil { + if participant := c.getParticipant(peerID, nil); participant != nil { // Convert the candidates to the WebRTC format. candidates := make([]webrtc.ICECandidateInit, len(candidatesEvent.Candidates)) for i, candidate := range candidatesEvent.Candidates { @@ -112,36 +101,23 @@ func (c *Conference) OnCandidates(peerID peer.ID, candidatesEvent *event.CallCan } } - participant.Peer.AddICECandidates(candidates) + participant.peer.AddICECandidates(candidates) } } func (c *Conference) OnSelectAnswer(peerID peer.ID, selectAnswerEvent *event.CallSelectAnswerEventContent) { - if participant := c.getParticipant(peerID); participant != nil { + if participant := c.getParticipant(peerID, nil); participant != nil { if selectAnswerEvent.SelectedPartyID != peerID.DeviceID.String() { c.logger.WithFields(logrus.Fields{ "device_id": selectAnswerEvent.SelectedPartyID, }).Errorf("Call was answered on a different device, kicking this peer") - participant.Peer.Terminate() + participant.peer.Terminate() } } } func (c *Conference) OnHangup(peerID peer.ID, hangupEvent *event.CallHangupEventContent) { - if participant := c.getParticipant(peerID); participant != nil { - participant.Peer.Terminate() - } -} - -func (c *Conference) getParticipant(peerID peer.ID) *Participant { - participant, ok := c.participants[peerID] - if !ok { - c.logger.WithFields(logrus.Fields{ - "user_id": peerID.UserID, - "device_id": peerID.DeviceID, - }).Errorf("Failed to find participant") - return nil + if participant := c.getParticipant(peerID, nil); participant != nil { + participant.peer.Terminate() } - - return participant } diff --git a/src/conference/config.go b/src/conference/config.go new file mode 100644 index 0000000..81952af --- /dev/null +++ b/src/conference/config.go @@ -0,0 +1,8 @@ +package conference + +// Configuration for the group conferences (calls). +type Config struct { + // Keep-alive timeout for WebRTC connections. If no keep-alive has been received + // from the client for this duration, the connection is considered dead. + KeepAliveTimeout int `yaml:"timeout"` +} diff --git a/src/conference/messages.go b/src/conference/messages.go new file mode 100644 index 0000000..e4b30ea --- /dev/null +++ b/src/conference/messages.go @@ -0,0 +1,179 @@ +package conference + +import ( + "encoding/json" + "errors" + + "github.com/matrix-org/waterfall/src/peer" + "maunium.net/go/mautrix/event" +) + +func (c *Conference) processMessages() { + for { + // Read a message from the stream (of type peer.Message) and process it. + message := <-c.peerEventsStream + c.processPeerMessage(message) + } +} + +//nolint:funlen +func (c *Conference) processPeerMessage(message peer.Message) { + // Since Go does not support ADTs, we have to use a switch statement to + // determine the actual type of the message. + switch msg := message.(type) { + case peer.JoinedTheCall: + case peer.LeftTheCall: + delete(c.participants, msg.Sender) + // TODO: Send new metadata about available streams to all participants. + // TODO: Send the hangup event over the Matrix back to the user. + + case peer.NewTrackPublished: + participant := c.getParticipant(msg.Sender, errors.New("New track published from unknown participant")) + if participant == nil { + return + } + + key := event.SFUTrackDescription{ + StreamID: msg.Track.StreamID(), + TrackID: msg.Track.ID(), + } + + if _, ok := participant.publishedTracks[key]; ok { + c.logger.Errorf("Track already published: %v", key) + return + } + + participant.publishedTracks[key] = msg.Track + + case peer.PublishedTrackFailed: + participant := c.getParticipant(msg.Sender, errors.New("Published track failed from unknown participant")) + if participant == nil { + return + } + + delete(participant.publishedTracks, event.SFUTrackDescription{ + StreamID: msg.Track.StreamID(), + TrackID: msg.Track.ID(), + }) + + // TODO: Should we remove the local tracks from every subscriber as well? Or will it happen automatically? + + case peer.NewICECandidate: + participant := c.getParticipant(msg.Sender, errors.New("ICE candidate from unknown participant")) + if participant == nil { + return + } + + // Convert WebRTC ICE candidate to Matrix ICE candidate. + jsonCandidate := msg.Candidate.ToJSON() + candidates := []event.CallCandidate{{ + Candidate: jsonCandidate.Candidate, + SDPMLineIndex: int(*jsonCandidate.SDPMLineIndex), + SDPMID: *jsonCandidate.SDPMid, + }} + c.signaling.SendICECandidates(participant.asMatrixRecipient(), candidates) + + case peer.ICEGatheringComplete: + participant := c.getParticipant(msg.Sender, errors.New("Received ICE complete from unknown participant")) + if participant == nil { + return + } + + // Send an empty array of candidates to indicate that ICE gathering is complete. + c.signaling.SendCandidatesGatheringFinished(participant.asMatrixRecipient()) + + case peer.RenegotiationRequired: + participant := c.getParticipant(msg.Sender, errors.New("Renegotiation from unknown participant")) + if participant == nil { + return + } + + toSend := event.SFUMessage{ + Op: event.SFUOperationOffer, + SDP: msg.Offer.SDP, + Metadata: c.getStreamsMetadata(participant.id), + } + + participant.sendDataChannelMessage(toSend) + + case peer.DataChannelMessage: + participant := c.getParticipant(msg.Sender, errors.New("Data channel message from unknown participant")) + if participant == nil { + return + } + + var sfuMessage event.SFUMessage + if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil { + c.logger.Errorf("Failed to unmarshal SFU message: %v", err) + return + } + + c.handleDataChannelMessage(participant, sfuMessage) + + case peer.DataChannelAvailable: + participant := c.getParticipant(msg.Sender, errors.New("Data channel available from unknown participant")) + if participant == nil { + return + } + + toSend := event.SFUMessage{ + Op: event.SFUOperationMetadata, + Metadata: c.getStreamsMetadata(participant.id), + } + + if err := participant.sendDataChannelMessage(toSend); err != nil { + c.logger.Errorf("Failed to send SFU message to open data channel: %v", err) + return + } + + default: + c.logger.Errorf("Unknown message type: %T", msg) + } +} + +// Handle the `SFUMessage` event from the DataChannel message. +func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessage event.SFUMessage) { + switch sfuMessage.Op { + case event.SFUOperationSelect: + // Get the tracks that correspond to the tracks that the participant wants to receive. + for _, track := range c.getTracks(sfuMessage.Start) { + if err := participant.peer.SubscribeToTrack(track); err != nil { + c.logger.Errorf("Failed to subscribe to track: %v", err) + return + } + } + + case event.SFUOperationAnswer: + if err := participant.peer.NewSDPAnswerReceived(sfuMessage.SDP); err != nil { + c.logger.Errorf("Failed to set SDP answer: %v", err) + return + } + + // TODO: Clarify the semantics of publish (just a new sdp offer?). + case event.SFUOperationPublish: + // TODO: Clarify the semantics of publish (how is it different from unpublish?). + case event.SFUOperationUnpublish: + // TODO: Handle the heartbeat message here (updating the last timestamp etc). + case event.SFUOperationAlive: + case event.SFUOperationMetadata: + participant.streamMetadata = sfuMessage.Metadata + + // Inform all participants about new metadata available. + for id, participant := range c.participants { + // Skip ourselves. + if id == participant.id { + continue + } + + toSend := event.SFUMessage{ + Op: event.SFUOperationMetadata, + Metadata: c.getStreamsMetadata(id), + } + + if err := participant.sendDataChannelMessage(toSend); err != nil { + c.logger.Errorf("Failed to send SFU message: %v", err) + return + } + } + } +} diff --git a/src/conference/participant.go b/src/conference/participant.go new file mode 100644 index 0000000..fc0ced3 --- /dev/null +++ b/src/conference/participant.go @@ -0,0 +1,91 @@ +package conference + +import ( + "encoding/json" + "errors" + + "github.com/matrix-org/waterfall/src/peer" + "github.com/matrix-org/waterfall/src/signaling" + "github.com/pion/webrtc/v3" + "github.com/sirupsen/logrus" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +var ErrInvalidSFUMessage = errors.New("invalid SFU message") + +type Participant struct { + id peer.ID + peer *peer.Peer + remoteSessionID id.SessionID + streamMetadata event.CallSDPStreamMetadata + publishedTracks map[event.SFUTrackDescription]*webrtc.TrackLocalStaticRTP +} + +func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient { + return signaling.MatrixRecipient{ + ID: p.id, + RemoteSessionID: p.remoteSessionID, + } +} + +func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) error { + jsonToSend, err := json.Marshal(toSend) + if err != nil { + return ErrInvalidSFUMessage + } + + if err := p.peer.SendOverDataChannel(string(jsonToSend)); err != nil { + // FIXME: We must buffer the message in this case and re-send it once the data channel is recovered! + // Or use Matrix signaling to inform the peer about the problem. + return err + } + + return nil +} + +func (c *Conference) getParticipant(peerID peer.ID, optionalErrorMessage error) *Participant { + participant, ok := c.participants[peerID] + if !ok { + logEntry := c.logger.WithFields(logrus.Fields{ + "user_id": peerID.UserID, + "device_id": peerID.DeviceID, + }) + + if optionalErrorMessage != nil { + logEntry.WithError(optionalErrorMessage) + } else { + logEntry.Error("Participant not found") + } + + return nil + } + + return participant +} + +func (c *Conference) getStreamsMetadata(forParticipant peer.ID) event.CallSDPStreamMetadata { + streamsMetadata := make(event.CallSDPStreamMetadata) + for id, participant := range c.participants { + if forParticipant != id { + for streamID, metadata := range participant.streamMetadata { + streamsMetadata[streamID] = metadata + } + } + } + + return streamsMetadata +} + +func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrtc.TrackLocalStaticRTP { + tracks := make([]*webrtc.TrackLocalStaticRTP, len(identifiers)) + for _, participant := range c.participants { + // Check if this participant has any of the tracks that we're looking for. + for _, identifier := range identifiers { + if track, ok := participant.publishedTracks[identifier]; ok { + tracks = append(tracks, track) + } + } + } + return tracks +} diff --git a/src/config/config.go b/src/config/config.go index 691ace9..d9adc83 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -5,22 +5,18 @@ import ( "fmt" "os" + "github.com/matrix-org/waterfall/src/conference" + "github.com/matrix-org/waterfall/src/signaling" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" - "maunium.net/go/mautrix/id" ) -// The mandatory SFU configuration. +// SFU configuration. type Config struct { - // The Matrix ID (MXID) of the SFU. - UserID id.UserID - // The ULR of the homeserver that SFU talks to. - HomeserverURL string - // The access token for the Matrix SDK. - AccessToken string - // Keep-alive timeout for WebRTC connections. If no keep-alive has been received - // from the client for this duration, the connection is considered dead. - KeepAliveTimeout int + // Matrix configuration. + Matrix signaling.Config `yaml:"matrix"` + // Conference (call) configuration. + Conference conference.Config `yaml:"conference"` } // Tries to load a config from the `CONFIG` environment variable. @@ -76,5 +72,12 @@ func LoadConfigFromString(configString string) (*Config, error) { return nil, fmt.Errorf("failed to unmarshal YAML file: %w", err) } + if config.Matrix.UserID == "" || + config.Matrix.HomeserverURL == "" || + config.Matrix.AccessToken == "" || + config.Conference.KeepAliveTimeout == 0 { + return nil, errors.New("invalid config values") + } + return &config, nil } diff --git a/src/main.go b/src/main.go index 320e6a0..cfffc62 100644 --- a/src/main.go +++ b/src/main.go @@ -65,7 +65,15 @@ func main() { config, err := config.LoadConfig(*configFilePath) if err != nil { logrus.WithError(err).Fatal("could not load config") + return } - signaling.RunServer(config) + // Create matrix client. + matrixClient := signaling.NewMatrixClient(config.Matrix) + + // Create a router to route incoming To-Device messages to the right conference. + router := newRouter(matrixClient, config.Conference) + + // Start matrix client sync. This function will block until the sync fails. + matrixClient.RunSync(router.handleMatrixEvent) } diff --git a/src/peer/channel.go b/src/peer/channel.go index 0f5e127..b0c573c 100644 --- a/src/peer/channel.go +++ b/src/peer/channel.go @@ -4,13 +4,13 @@ import ( "github.com/pion/webrtc/v3" ) -type MessageChannel chan interface{} +type Message = interface{} -type PeerJoinedTheCall struct { +type JoinedTheCall struct { Sender ID } -type PeerLeftTheCall struct { +type LeftTheCall struct { Sender ID } @@ -33,25 +33,16 @@ type ICEGatheringComplete struct { Sender ID } -type NewOffer struct { +type RenegotiationRequired struct { Sender ID Offer *webrtc.SessionDescription } -type DataChannelOpened struct { - Sender ID -} - -type DataChannelClosed struct { - Sender ID -} - type DataChannelMessage struct { Sender ID Message string } -type DataChannelError struct { +type DataChannelAvailable struct { Sender ID - Err error } diff --git a/src/peer/peer.go b/src/peer/peer.go index e9a3b8b..84506d0 100644 --- a/src/peer/peer.go +++ b/src/peer/peer.go @@ -15,6 +15,7 @@ var ( ErrCantSetLocalDescription = errors.New("can't set local description") ErrCantCreateLocalDescription = errors.New("can't create local description") ErrDataChannelNotAvailable = errors.New("data channel is not available") + ErrDataChannelNotReady = errors.New("data channel is not ready") ErrCantSubscribeToTrack = errors.New("can't subscribe to track") ) @@ -103,7 +104,7 @@ func (p *Peer) Terminate() { p.logger.WithError(err).Error("failed to close peer connection") } - p.notify <- PeerLeftTheCall{Sender: p.id} + p.notify <- LeftTheCall{Sender: p.id} } func (p *Peer) AddICECandidates(candidates []webrtc.ICECandidateInit) { @@ -133,9 +134,27 @@ func (p *Peer) SendOverDataChannel(json string) error { return ErrDataChannelNotAvailable } + if p.dataChannel.ReadyState() != webrtc.DataChannelStateOpen { + p.logger.Error("can't send data over data channel: data channel is not open") + return ErrDataChannelNotReady + } + if err := p.dataChannel.SendText(json); err != nil { p.logger.WithError(err).Error("failed to send data over data channel") } return nil } + +func (p *Peer) NewSDPAnswerReceived(sdpAnswer string) error { + err := p.peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdpAnswer, + }) + if err != nil { + p.logger.WithError(err).Error("failed to set remote description") + return ErrCantSetRemoteDecsription + } + + return nil +} diff --git a/src/peer/webrtc.go b/src/peer/webrtc.go index 4d97a80..889416c 100644 --- a/src/peer/webrtc.go +++ b/src/peer/webrtc.go @@ -93,7 +93,7 @@ func (p *Peer) onNegotiationNeeded() { return } - p.notify <- NewOffer{Sender: p.id, Offer: &offer} + p.notify <- RenegotiationRequired{Sender: p.id, Offer: &offer} } // A callback that is called once we receive an ICE connection state change for this peer connection. @@ -132,9 +132,9 @@ func (p *Peer) onConnectionStateChanged(state webrtc.PeerConnectionState) { switch state { case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateClosed: - p.notify <- PeerLeftTheCall{Sender: p.id} + p.notify <- LeftTheCall{Sender: p.id} case webrtc.PeerConnectionStateConnected: - p.notify <- PeerJoinedTheCall{Sender: p.id} + p.notify <- JoinedTheCall{Sender: p.id} } } @@ -154,12 +154,7 @@ func (p *Peer) onDataChannelReady(dc *webrtc.DataChannel) { dc.OnOpen(func() { p.logger.Info("data channel opened") - p.notify <- DataChannelOpened{Sender: p.id} - }) - - dc.OnClose(func() { - p.logger.Info("data channel closed") - p.notify <- DataChannelClosed{Sender: p.id} + p.notify <- DataChannelAvailable{Sender: p.id} }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -173,6 +168,9 @@ func (p *Peer) onDataChannelReady(dc *webrtc.DataChannel) { dc.OnError(func(err error) { p.logger.WithError(err).Error("data channel error") - p.notify <- DataChannelError{Sender: p.id, Err: err} + }) + + dc.OnClose(func() { + p.logger.Info("data channel closed") }) } diff --git a/src/signaling/signaling.go b/src/router.go similarity index 54% rename from src/signaling/signaling.go rename to src/router.go index ab9ea6e..e7b3899 100644 --- a/src/signaling/signaling.go +++ b/src/router.go @@ -14,53 +14,39 @@ See the License for the specific language governing permissions and limitations under the License. */ -package signaling +package main import ( - "errors" - "github.com/matrix-org/waterfall/src/conference" "github.com/matrix-org/waterfall/src/peer" + "github.com/matrix-org/waterfall/src/signaling" "github.com/sirupsen/logrus" - "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" ) -var ErrNoSuchConference = errors.New("no such conference") - -// The top-level state of the SignalingServer. -// Note that in Matrix MSCs, the term "focus" is used to refer to the SignalingServer. But since "focus" is a very -// generic name and only makes sense in a certain context, we use the term "SignalingServer" instead to avoid confusion -// given that this particular part is just the SignalingServer logic (and not the "focus" selection algorithm etc). -type SignalingServer struct { - // Matrix client. - client *mautrix.Client +// The top-level state of the Router. +type Router struct { + // Matrix matrix. + matrix *signaling.MatrixClient // All calls currently forwarded by this SFU. conferences map[string]*conference.Conference // Configuration for the calls. - config *conference.CallConfig + config conference.Config } // Creates a new instance of the SFU with the given configuration. -func NewSignalingServer(client *mautrix.Client, config *conference.CallConfig) *SignalingServer { - return &SignalingServer{ - client: client, +func newRouter(matrix *signaling.MatrixClient, config conference.Config) *Router { + return &Router{ + matrix: matrix, conferences: make(map[string]*conference.Conference), config: config, } } -// Handles To-Device events that the SFU receives from clients. +// Handles incoming To-Device events that the SFU receives from clients. // //nolint:funlen -func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) { - // We only care about to-device events. - if evt.Type.Class != event.ToDeviceEventType { - logrus.Warn("ignoring a not to-device event") - return - } - +func (r *Router) handleMatrixEvent(evt *event.Event) { // TODO: Don't create logger again and again, it might be a bit expensive. logger := logrus.WithFields(logrus.Fields{ "type": evt.Type.Type, @@ -68,11 +54,6 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) "conf_id": evt.Content.Raw["conf_id"], }) - if evt.Content.Raw["dest_session_id"] != LocalSessionID { - logger.WithField("dest_session_id", LocalSessionID).Warn("SessionID does not match our SessionID - ignoring") - return - } - switch evt.Type.Type { // Someone tries to participate in a call (join a call). case event.ToDeviceCallInvite.Type: @@ -83,9 +64,13 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) } // If there is an invitation sent and the conf does not exist, create one. - if conf := f.conferences[invite.ConfID]; conf == nil { + if conf := r.conferences[invite.ConfID]; conf == nil { logger.Infof("creating new conference %s", invite.ConfID) - f.conferences[invite.ConfID] = conference.NewConference(invite.ConfID, f.config) + r.conferences[invite.ConfID] = conference.NewConference( + invite.ConfID, + r.config, + r.matrix.CreateForConference(invite.ConfID), + ) } peerID := peer.ID{ @@ -94,7 +79,7 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) } // Inform conference about incoming participant. - f.conferences[invite.ConfID].OnNewParticipant(peerID, invite) + r.conferences[invite.ConfID].OnNewParticipant(peerID, invite) // Someone tries to send ICE candidates to the existing call. case event.ToDeviceCallCandidates.Type: @@ -104,7 +89,7 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) return } - conference := f.conferences[candidates.ConfID] + conference := r.conferences[candidates.ConfID] if conference == nil { logger.Errorf("received candidates for unknown conference %s", candidates.ConfID) return @@ -125,7 +110,7 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) return } - conference := f.conferences[selectAnswer.ConfID] + conference := r.conferences[selectAnswer.ConfID] if conference == nil { logger.Errorf("received select_answer for unknown conference %s", selectAnswer.ConfID) return @@ -146,7 +131,7 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) return } - conference := f.conferences[hangup.ConfID] + conference := r.conferences[hangup.ConfID] if conference == nil { logger.Errorf("received hangup for unknown conference %s", hangup.ConfID) return @@ -170,60 +155,3 @@ func (f *SignalingServer) onMatrixEvent(_ mautrix.EventSource, evt *event.Event) logger.Warnf("ignoring unexpected event: %s", evt.Type.Type) } } - -func (f *SignalingServer) createSDPAnswerEvent( - conferenceID string, - destSessionID id.SessionID, - peerID peer.ID, - sdp string, - streamMetadata event.CallSDPStreamMetadata, -) *event.Content { - return &event.Content{ - Parsed: event.CallAnswerEventContent{ - BaseCallEventContent: createBaseEventContent(conferenceID, f.client.DeviceID, peerID.DeviceID, destSessionID), - Answer: event.CallData{ - Type: "answer", - SDP: sdp, - }, - SDPStreamMetadata: streamMetadata, - }, - } -} - -func createBaseEventContent( - conferenceID string, - sfuDeviceID id.DeviceID, - destDeviceID id.DeviceID, - destSessionID id.SessionID, -) event.BaseCallEventContent { - return event.BaseCallEventContent{ - CallID: conferenceID, - ConfID: conferenceID, - DeviceID: sfuDeviceID, - SenderSessionID: LocalSessionID, - DestSessionID: destSessionID, - PartyID: string(destDeviceID), - Version: event.CallVersion("1"), - } -} - -// Sends a to-device event to the given user. -func (f *SignalingServer) sendToDevice(participantID peer.ID, ev *event.Event) { - // TODO: Don't create logger again and again, it might be a bit expensive. - logger := logrus.WithFields(logrus.Fields{ - "user_id": participantID.UserID, - "device_id": participantID.DeviceID, - }) - - sendRequest := &mautrix.ReqSendToDevice{ - Messages: map[id.UserID]map[id.DeviceID]*event.Content{ - participantID.UserID: { - participantID.DeviceID: &ev.Content, - }, - }, - } - - if _, err := f.client.SendToDevice(ev.Type, sendRequest); err != nil { - logger.Errorf("failed to send to-device event: %w", err) - } -} diff --git a/src/signaling/config.go b/src/signaling/config.go new file mode 100644 index 0000000..acd730c --- /dev/null +++ b/src/signaling/config.go @@ -0,0 +1,13 @@ +package signaling + +import "maunium.net/go/mautrix/id" + +// Configuration for the Matrix client. +type Config struct { + // The Matrix ID (MXID) of the SFU. + UserID id.UserID `yaml:"userid"` + // The ULR of the homeserver that SFU talks to. + HomeserverURL string `yaml:"homeserverurl"` + // The access token for the Matrix SDK. + AccessToken string `yaml:"accesstoken"` +} diff --git a/src/signaling/matrix.go b/src/signaling/matrix.go index d2d43a2..3c56a27 100644 --- a/src/signaling/matrix.go +++ b/src/signaling/matrix.go @@ -17,17 +17,20 @@ limitations under the License. package signaling import ( - "github.com/matrix-org/waterfall/src/conference" - "github.com/matrix-org/waterfall/src/config" + "github.com/matrix-org/waterfall/src/peer" "github.com/sirupsen/logrus" "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" ) const LocalSessionID = "sfu" -// Starts the Matrix client and connects to the homeserver, -// runs the SFU. Returns only when the sync with Matrix fails. -func RunServer(config *config.Config) { +type MatrixClient struct { + client *mautrix.Client +} + +func NewMatrixClient(config Config) *MatrixClient { client, err := mautrix.NewClient(config.HomeserverURL, config.UserID, config.AccessToken) if err != nil { logrus.WithError(err).Fatal("Failed to create client") @@ -45,23 +48,152 @@ func RunServer(config *config.Config) { logrus.WithField("device_id", whoami.DeviceID).Info("Identified SFU as DeviceID") client.DeviceID = whoami.DeviceID - focus := NewSignalingServer( - client, - &conference.CallConfig{KeepAliveTimeout: config.KeepAliveTimeout}, - ) + return &MatrixClient{ + client: client, + } +} - syncer, ok := client.Syncer.(*mautrix.DefaultSyncer) +// Starts the Matrix client and connects to the homeserver, +// Returns only when the sync with Matrix fails. +func (m *MatrixClient) RunSync(callback func(*event.Event)) { + syncer, ok := m.client.Syncer.(*mautrix.DefaultSyncer) if !ok { logrus.Panic("Syncer is not DefaultSyncer") } syncer.ParseEventContent = true - syncer.OnEvent(focus.onMatrixEvent) + syncer.OnEvent(func(_ mautrix.EventSource, evt *event.Event) { + // We only care about to-device events. + if evt.Type.Class != event.ToDeviceEventType { + logrus.Warn("ignoring a not to-device event") + return + } + + // We drop the messages if they are not meant for us. + if evt.Content.Raw["dest_session_id"] != LocalSessionID { + logrus.Warn("SessionID does not match our SessionID - ignoring") + return + } + + callback(evt) + }) // TODO: We may want to reconnect if `Sync()` fails instead of ending the SFU // as ending here will essentially drop all conferences which may not necessarily // be what we want for the existing running conferences. - if err = client.Sync(); err != nil { + if err := m.client.Sync(); err != nil { logrus.WithError(err).Panic("Sync failed") } } + +func (m *MatrixClient) CreateForConference(conferenceID string) *MatrixForConference { + return &MatrixForConference{ + client: m.client, + conferenceID: conferenceID, + } +} + +type MatrixRecipient struct { + ID peer.ID + RemoteSessionID id.SessionID +} + +type MatrixSignaling interface { + SendSDPAnswer(recipient MatrixRecipient, streamMetadata event.CallSDPStreamMetadata, sdp string) + SendICECandidates(recipient MatrixRecipient, candidates []event.CallCandidate) + SendCandidatesGatheringFinished(recipient MatrixRecipient) + SendHangup(recipient MatrixRecipient, reason event.CallHangupReason) +} + +type MatrixForConference struct { + client *mautrix.Client + conferenceID string +} + +func (m *MatrixForConference) SendSDPAnswer( + recipient MatrixRecipient, + streamMetadata event.CallSDPStreamMetadata, + sdp string, +) { + eventContent := &event.Content{ + Parsed: event.CallAnswerEventContent{ + BaseCallEventContent: m.createBaseEventContent(recipient.ID.DeviceID, recipient.RemoteSessionID), + Answer: event.CallData{ + Type: "answer", + SDP: sdp, + }, + SDPStreamMetadata: streamMetadata, + }, + } + + m.sendToDevice(recipient.ID, event.CallAnswer, eventContent) +} + +func (m *MatrixForConference) SendICECandidates(recipient MatrixRecipient, candidates []event.CallCandidate) { + eventContent := &event.Content{ + Parsed: event.CallCandidatesEventContent{ + BaseCallEventContent: m.createBaseEventContent(recipient.ID.DeviceID, recipient.RemoteSessionID), + Candidates: candidates, + }, + } + + m.sendToDevice(recipient.ID, event.CallCandidates, eventContent) +} + +func (m *MatrixForConference) SendCandidatesGatheringFinished(recipient MatrixRecipient) { + eventContent := &event.Content{ + Parsed: event.CallCandidatesEventContent{ + BaseCallEventContent: m.createBaseEventContent(recipient.ID.DeviceID, recipient.RemoteSessionID), + Candidates: []event.CallCandidate{{Candidate: ""}}, + }, + } + + m.sendToDevice(recipient.ID, event.CallCandidates, eventContent) +} + +func (m *MatrixForConference) SendHangup(recipient MatrixRecipient, reason event.CallHangupReason) { + eventContent := &event.Content{ + Parsed: event.CallHangupEventContent{ + BaseCallEventContent: m.createBaseEventContent(recipient.ID.DeviceID, recipient.RemoteSessionID), + Reason: reason, + }, + } + + m.sendToDevice(recipient.ID, event.CallHangup, eventContent) +} + +func (m *MatrixForConference) createBaseEventContent( + destDeviceID id.DeviceID, + destSessionID id.SessionID, +) event.BaseCallEventContent { + return event.BaseCallEventContent{ + CallID: m.conferenceID, + ConfID: m.conferenceID, + DeviceID: m.client.DeviceID, + SenderSessionID: LocalSessionID, + DestSessionID: destSessionID, + PartyID: string(destDeviceID), + Version: event.CallVersion("1"), + } +} + +// Sends a to-device event to the given user. +func (m *MatrixForConference) sendToDevice(participantID peer.ID, eventType event.Type, eventContent *event.Content) { + // TODO: Don't create logger again and again, it might be a bit expensive. + logger := logrus.WithFields(logrus.Fields{ + "user_id": participantID.UserID, + "device_id": participantID.DeviceID, + }) + + sendRequest := &mautrix.ReqSendToDevice{ + Messages: map[id.UserID]map[id.DeviceID]*event.Content{ + participantID.UserID: { + participantID.DeviceID: eventContent, + }, + }, + } + + if _, err := m.client.SendToDevice(eventType, sendRequest); err != nil { + logger.Errorf("failed to send to-device event: %w", err) + } +}