diff --git a/pkg/conference/matrix.go b/pkg/conference/matrix.go index f4ba6f5..ed6e853 100644 --- a/pkg/conference/matrix.go +++ b/pkg/conference/matrix.go @@ -6,14 +6,14 @@ import ( "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" ) type MessageContent interface{} type MatrixMessage struct { - UserID id.UserID - Content MessageContent + Sender ParticipantID + Content MessageContent + RawEvent *event.Event } // New participant tries to join the conference. @@ -21,6 +21,7 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent * logger := c.logger.WithFields(logrus.Fields{ "user_id": participantID.UserID, "device_id": participantID.DeviceID, + "call_id": participantID.CallID, }) logger.Info("Incoming call invite") diff --git a/pkg/conference/participant.go b/pkg/conference/participant.go index a7b9385..61b9df2 100644 --- a/pkg/conference/participant.go +++ b/pkg/conference/participant.go @@ -16,6 +16,7 @@ import ( type ParticipantID struct { UserID id.UserID DeviceID id.DeviceID + CallID string } // Participant represents a participant in the conference. @@ -32,6 +33,7 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient { return signaling.MatrixRecipient{ UserID: p.id.UserID, DeviceID: p.id.DeviceID, + CallID: p.id.CallID, RemoteSessionID: p.remoteSessionID, } } diff --git a/pkg/conference/processor.go b/pkg/conference/processor.go index 2a43726..fbaf0b9 100644 --- a/pkg/conference/processor.go +++ b/pkg/conference/processor.go @@ -196,13 +196,13 @@ func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessa func (c *Conference) processMatrixMessage(msg MatrixMessage) { switch ev := msg.Content.(type) { case *event.CallInviteEventContent: - c.onNewParticipant(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev) + c.onNewParticipant(msg.Sender, ev) case *event.CallCandidatesEventContent: - c.onCandidates(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev) + c.onCandidates(msg.Sender, ev) case *event.CallSelectAnswerEventContent: - c.onSelectAnswer(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev) + c.onSelectAnswer(msg.Sender, ev) case *event.CallHangupEventContent: - c.onHangup(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev) + c.onHangup(msg.Sender, ev) default: c.logger.Errorf("Unexpected event type: %T", ev) } diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 871360b..02d12b2 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -47,7 +47,7 @@ func StartConference( logger: logrus.WithFields(logrus.Fields{"conf_id": confID}), } - participantID := ParticipantID{UserID: UserID, DeviceID: inviteEvent.DeviceID} + participantID := ParticipantID{UserID: UserID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID} if err := conference.onNewParticipant(participantID, inviteEvent); err != nil { return nil, err } diff --git a/pkg/router.go b/pkg/router.go index 00bebb8..b2a9630 100644 --- a/pkg/router.go +++ b/pkg/router.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/waterfall/pkg/signaling" "github.com/sirupsen/logrus" "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" ) type Conference = common.Sender[conf.MatrixMessage] @@ -58,10 +59,11 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router case ConferenceEndedMessage: // Remove the conference that ended from the list. delete(router.conferenceSinks, msg.conferenceID) + // Process the message that was not read by the conference. - if len(msg.unread) > 0 { - // FIXME: We must handle these messages! - logrus.Warnf("Unread messages: %v", len(msg.unread)) + for _, msg := range msg.unread { + // TODO: We actually already know the type, so we can do this better. + router.handleMatrixEvent(msg.RawEvent) } } } @@ -72,22 +74,36 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router // Handles incoming To-Device events that the SFU receives from clients. func (r *Router) handleMatrixEvent(evt *event.Event) { - // Check if `conf_id` is present in the message (right messages do have it). - rawConferenceID, ok := evt.Content.Raw["conf_id"] - if !ok { - return - } + var ( + conferenceID string + callID string + deviceID string + userID = evt.Sender + ) - // Try to parse the conference ID without parsing the whole event. - conferenceID, ok := rawConferenceID.(string) - if !ok { - return + // Check if `conf_id` is present in the message (right messages do have it). + rawConferenceID, okConferenceId := evt.Content.Raw["conf_id"] + rawCallID, okCallId := evt.Content.Raw["call_id"] + rawDeviceID, okDeviceID := evt.Content.Raw["device_id"] + + if okConferenceId && okCallId && okDeviceID { + // Extract the conference ID from the message. + conferenceID, okConferenceId = rawConferenceID.(string) + callID, okCallId = rawCallID.(string) + deviceID, okDeviceID = rawDeviceID.(string) + + if !okConferenceId || !okCallId || !okDeviceID { + logrus.Warn("Ignoring invalid message without IDs") + return + } } logger := logrus.WithFields(logrus.Fields{ - "type": evt.Type.Type, - "user_id": evt.Sender.String(), - "conf_id": conferenceID, + "type": evt.Type.Type, + "user_id": userID, + "conf_id": conferenceID, + "call_id": callID, + "device_id": deviceID, }) conference := r.conferenceSinks[conferenceID] @@ -101,7 +117,7 @@ func (r *Router) handleMatrixEvent(evt *event.Event) { r.config, r.matrix.CreateForConference(conferenceID), createConferenceEndNotifier(conferenceID, r.channel), - evt.Sender, + userID, evt.Content.AsCallInvite(), ) if err != nil { @@ -122,9 +138,10 @@ func (r *Router) handleMatrixEvent(evt *event.Event) { // A helper function to deal with messages that can't be sent due to the conference closed. // Not a function due to the need to capture local environment. sendToConference := func(eventContent conf.MessageContent) { + sender := conf.ParticipantID{userID, id.DeviceID(deviceID), callID} // At this point the conference is not nil. // Let's check if the channel is still available. - if conference.Send(conf.MatrixMessage{UserID: evt.Sender, Content: eventContent}) != nil { + if conference.Send(conf.MatrixMessage{Content: eventContent, RawEvent: evt, Sender: sender}) != nil { // If sending failed, then the conference is over. delete(r.conferenceSinks, conferenceID) // Since we were not able to send the message, let's re-process it now. diff --git a/pkg/signaling/matrix.go b/pkg/signaling/matrix.go index ff0fe49..29d2681 100644 --- a/pkg/signaling/matrix.go +++ b/pkg/signaling/matrix.go @@ -46,6 +46,7 @@ type MatrixRecipient struct { UserID id.UserID DeviceID id.DeviceID RemoteSessionID id.SessionID + CallID string } // Interface that abstracts sending Send-to-device messages for the conference. @@ -63,7 +64,7 @@ func (m *MatrixForConference) SendSDPAnswer( ) { eventContent := &event.Content{ Parsed: event.CallAnswerEventContent{ - BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID), + BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID), Answer: event.CallData{ Type: "answer", SDP: sdp, @@ -78,7 +79,7 @@ func (m *MatrixForConference) SendSDPAnswer( func (m *MatrixForConference) SendICECandidates(recipient MatrixRecipient, candidates []event.CallCandidate) { eventContent := &event.Content{ Parsed: event.CallCandidatesEventContent{ - BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID), + BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID), Candidates: candidates, }, } @@ -89,7 +90,7 @@ func (m *MatrixForConference) SendICECandidates(recipient MatrixRecipient, candi func (m *MatrixForConference) SendCandidatesGatheringFinished(recipient MatrixRecipient) { eventContent := &event.Content{ Parsed: event.CallCandidatesEventContent{ - BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID), + BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID), Candidates: []event.CallCandidate{{Candidate: ""}}, }, } @@ -100,7 +101,7 @@ func (m *MatrixForConference) SendCandidatesGatheringFinished(recipient MatrixRe func (m *MatrixForConference) SendHangup(recipient MatrixRecipient, reason event.CallHangupReason) { eventContent := &event.Content{ Parsed: event.CallHangupEventContent{ - BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID), + BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID), Reason: reason, }, } @@ -108,9 +109,12 @@ func (m *MatrixForConference) SendHangup(recipient MatrixRecipient, reason event m.sendToDevice(recipient, event.CallHangup, eventContent) } -func (m *MatrixForConference) createBaseEventContent(destSessionID id.SessionID) event.BaseCallEventContent { +func (m *MatrixForConference) createBaseEventContent( + callID string, + destSessionID id.SessionID, +) event.BaseCallEventContent { return event.BaseCallEventContent{ - CallID: m.conferenceID, + CallID: callID, ConfID: m.conferenceID, DeviceID: m.client.DeviceID, SenderSessionID: LocalSessionID,