Skip to content

Commit

Permalink
refactor: finalize signaling and peer communication
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-abramov committed Nov 22, 2022
1 parent 4d6cee6 commit 25ba9e2
Show file tree
Hide file tree
Showing 14 changed files with 572 additions and 222 deletions.
10 changes: 6 additions & 4 deletions config.yaml.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
10 changes: 6 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ services:
environment:
# Set the `CONFIG` to the configuration you want.
CONFIG: |
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
118 changes: 47 additions & 71 deletions src/conference/conference.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,33 @@ package conference

import (
"github.com/matrix-org/waterfall/src/peer"
"github.com/matrix-org/waterfall/src/signaling"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)

// Configuration for the group conferences (calls).
type CallConfig struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead.
KeepAliveTimeout int
}

type Participant struct {
Peer *peer.Peer
Data *ParticipantData
}

type ParticipantData struct {
RemoteSessionID id.SessionID
StreamMetadata event.CallSDPStreamMetadata
}

type Conference struct {
conferenceID string
config *CallConfig
participants map[peer.ID]*Participant
participantsChannel peer.MessageChannel
logger *logrus.Entry
id string
config Config
signaling signaling.MatrixSignaling
participants map[peer.ID]*Participant
peerEventsStream chan peer.Message
logger *logrus.Entry
}

func NewConference(confID string, config *CallConfig) *Conference {
conference := new(Conference)
conference.config = config
conference.conferenceID = confID
conference.participants = make(map[peer.ID]*Participant)
conference.participantsChannel = make(peer.MessageChannel)
conference.logger = logrus.WithFields(logrus.Fields{
"conf_id": confID,
})
func NewConference(confID string, config Config, signaling signaling.MatrixSignaling) *Conference {
conference := &Conference{
id: confID,
config: config,
signaling: signaling,
participants: make(map[peer.ID]*Participant),
peerEventsStream: make(chan peer.Message),
logger: logrus.WithFields(logrus.Fields{"conf_id": confID}),
}

// Start conference "main loop".
go conference.processMessages()
return conference
}

Expand All @@ -66,41 +53,43 @@ func (c *Conference) OnNewParticipant(participantID peer.ID, inviteEvent *event.
// As per MSC3401, when the `session_id` field changes from an incoming `m.call.member` event,
// any existing calls from this device in this call should be terminated.
// TODO: Implement this.
/*
for _, participant := range c.participants {
if participant.data.DeviceID == inviteEvent.DeviceID {
if participant.data.RemoteSessionID == inviteEvent.SenderSessionID {
c.logger.WithFields(logrus.Fields{
"device_id": inviteEvent.DeviceID,
"session_id": inviteEvent.SenderSessionID,
}).Errorf("Found existing participant with equal DeviceID and SessionID")
return
} else {
participant.Terminate()
delete(c.participants, participant.data.UserID)
}
for id, participant := range c.participants {
if id.DeviceID == inviteEvent.DeviceID {
if participant.remoteSessionID == inviteEvent.SenderSessionID {
c.logger.WithFields(logrus.Fields{
"device_id": inviteEvent.DeviceID,
"session_id": inviteEvent.SenderSessionID,
}).Errorf("Found existing participant with equal DeviceID and SessionID")
return
} else {
participant.peer.Terminate()
}
}
*/
}

peer, _, err := peer.NewPeer(participantID, c.conferenceID, inviteEvent.Offer.SDP, c.participantsChannel)
peer, sdpOffer, err := peer.NewPeer(participantID, c.id, inviteEvent.Offer.SDP, c.peerEventsStream)
if err != nil {
c.logger.WithError(err).Errorf("Failed to create new peer")
return
}

participantData := &ParticipantData{
RemoteSessionID: inviteEvent.SenderSessionID,
StreamMetadata: inviteEvent.SDPStreamMetadata,
participant := &Participant{
id: participantID,
peer: peer,
remoteSessionID: inviteEvent.SenderSessionID,
streamMetadata: inviteEvent.SDPStreamMetadata,
publishedTracks: make(map[event.SFUTrackDescription]*webrtc.TrackLocalStaticRTP),
}

c.participants[participantID] = &Participant{Peer: peer, Data: participantData}
c.participants[participantID] = participant

// TODO: Send the SDP answer back to the participant's device.
recipient := participant.asMatrixRecipient()
streamMetadata := c.getStreamsMetadata(participantID)
c.signaling.SendSDPAnswer(recipient, streamMetadata, sdpOffer.SDP)
}

func (c *Conference) OnCandidates(peerID peer.ID, candidatesEvent *event.CallCandidatesEventContent) {
if participant := c.getParticipant(peerID); participant != nil {
if participant := c.getParticipant(peerID, nil); participant != nil {
// Convert the candidates to the WebRTC format.
candidates := make([]webrtc.ICECandidateInit, len(candidatesEvent.Candidates))
for i, candidate := range candidatesEvent.Candidates {
Expand All @@ -112,36 +101,23 @@ func (c *Conference) OnCandidates(peerID peer.ID, candidatesEvent *event.CallCan
}
}

participant.Peer.AddICECandidates(candidates)
participant.peer.AddICECandidates(candidates)
}
}

func (c *Conference) OnSelectAnswer(peerID peer.ID, selectAnswerEvent *event.CallSelectAnswerEventContent) {
if participant := c.getParticipant(peerID); participant != nil {
if participant := c.getParticipant(peerID, nil); participant != nil {
if selectAnswerEvent.SelectedPartyID != peerID.DeviceID.String() {
c.logger.WithFields(logrus.Fields{
"device_id": selectAnswerEvent.SelectedPartyID,
}).Errorf("Call was answered on a different device, kicking this peer")
participant.Peer.Terminate()
participant.peer.Terminate()
}
}
}

func (c *Conference) OnHangup(peerID peer.ID, hangupEvent *event.CallHangupEventContent) {
if participant := c.getParticipant(peerID); participant != nil {
participant.Peer.Terminate()
}
}

func (c *Conference) getParticipant(peerID peer.ID) *Participant {
participant, ok := c.participants[peerID]
if !ok {
c.logger.WithFields(logrus.Fields{
"user_id": peerID.UserID,
"device_id": peerID.DeviceID,
}).Errorf("Failed to find participant")
return nil
if participant := c.getParticipant(peerID, nil); participant != nil {
participant.peer.Terminate()
}

return participant
}
8 changes: 8 additions & 0 deletions src/conference/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package conference

// Configuration for the group conferences (calls).
type Config struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead.
KeepAliveTimeout int `yaml:"timeout"`
}
179 changes: 179 additions & 0 deletions src/conference/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package conference

import (
"encoding/json"
"errors"

"github.com/matrix-org/waterfall/src/peer"
"maunium.net/go/mautrix/event"
)

func (c *Conference) processMessages() {
for {
// Read a message from the stream (of type peer.Message) and process it.
message := <-c.peerEventsStream
c.processPeerMessage(message)
}
}

//nolint:funlen
func (c *Conference) processPeerMessage(message peer.Message) {
// Since Go does not support ADTs, we have to use a switch statement to
// determine the actual type of the message.
switch msg := message.(type) {
case peer.JoinedTheCall:
case peer.LeftTheCall:
delete(c.participants, msg.Sender)
// TODO: Send new metadata about available streams to all participants.
// TODO: Send the hangup event over the Matrix back to the user.

case peer.NewTrackPublished:
participant := c.getParticipant(msg.Sender, errors.New("New track published from unknown participant"))
if participant == nil {
return
}

key := event.SFUTrackDescription{
StreamID: msg.Track.StreamID(),
TrackID: msg.Track.ID(),
}

if _, ok := participant.publishedTracks[key]; ok {
c.logger.Errorf("Track already published: %v", key)
return
}

participant.publishedTracks[key] = msg.Track

case peer.PublishedTrackFailed:
participant := c.getParticipant(msg.Sender, errors.New("Published track failed from unknown participant"))
if participant == nil {
return
}

delete(participant.publishedTracks, event.SFUTrackDescription{
StreamID: msg.Track.StreamID(),
TrackID: msg.Track.ID(),
})

// TODO: Should we remove the local tracks from every subscriber as well? Or will it happen automatically?

case peer.NewICECandidate:
participant := c.getParticipant(msg.Sender, errors.New("ICE candidate from unknown participant"))
if participant == nil {
return
}

// Convert WebRTC ICE candidate to Matrix ICE candidate.
jsonCandidate := msg.Candidate.ToJSON()
candidates := []event.CallCandidate{{
Candidate: jsonCandidate.Candidate,
SDPMLineIndex: int(*jsonCandidate.SDPMLineIndex),
SDPMID: *jsonCandidate.SDPMid,
}}
c.signaling.SendICECandidates(participant.asMatrixRecipient(), candidates)

case peer.ICEGatheringComplete:
participant := c.getParticipant(msg.Sender, errors.New("Received ICE complete from unknown participant"))
if participant == nil {
return
}

// Send an empty array of candidates to indicate that ICE gathering is complete.
c.signaling.SendCandidatesGatheringFinished(participant.asMatrixRecipient())

case peer.RenegotiationRequired:
participant := c.getParticipant(msg.Sender, errors.New("Renegotiation from unknown participant"))
if participant == nil {
return
}

toSend := event.SFUMessage{
Op: event.SFUOperationOffer,
SDP: msg.Offer.SDP,
Metadata: c.getStreamsMetadata(participant.id),
}

participant.sendDataChannelMessage(toSend)

case peer.DataChannelMessage:
participant := c.getParticipant(msg.Sender, errors.New("Data channel message from unknown participant"))
if participant == nil {
return
}

var sfuMessage event.SFUMessage
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
return
}

c.handleDataChannelMessage(participant, sfuMessage)

case peer.DataChannelAvailable:
participant := c.getParticipant(msg.Sender, errors.New("Data channel available from unknown participant"))
if participant == nil {
return
}

toSend := event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getStreamsMetadata(participant.id),
}

if err := participant.sendDataChannelMessage(toSend); err != nil {
c.logger.Errorf("Failed to send SFU message to open data channel: %v", err)
return
}

default:
c.logger.Errorf("Unknown message type: %T", msg)
}
}

// Handle the `SFUMessage` event from the DataChannel message.
func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessage event.SFUMessage) {
switch sfuMessage.Op {
case event.SFUOperationSelect:
// Get the tracks that correspond to the tracks that the participant wants to receive.
for _, track := range c.getTracks(sfuMessage.Start) {
if err := participant.peer.SubscribeToTrack(track); err != nil {
c.logger.Errorf("Failed to subscribe to track: %v", err)
return
}
}

case event.SFUOperationAnswer:
if err := participant.peer.NewSDPAnswerReceived(sfuMessage.SDP); err != nil {
c.logger.Errorf("Failed to set SDP answer: %v", err)
return
}

// TODO: Clarify the semantics of publish (just a new sdp offer?).
case event.SFUOperationPublish:
// TODO: Clarify the semantics of publish (how is it different from unpublish?).
case event.SFUOperationUnpublish:
// TODO: Handle the heartbeat message here (updating the last timestamp etc).
case event.SFUOperationAlive:
case event.SFUOperationMetadata:
participant.streamMetadata = sfuMessage.Metadata

// Inform all participants about new metadata available.
for id, participant := range c.participants {
// Skip ourselves.
if id == participant.id {
continue
}

toSend := event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getStreamsMetadata(id),
}

if err := participant.sendDataChannelMessage(toSend); err != nil {
c.logger.Errorf("Failed to send SFU message: %v", err)
return
}
}
}
}
Loading

0 comments on commit 25ba9e2

Please sign in to comment.