Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Go in a more idiomatic way #126

Merged
merged 17 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cmd/sfu/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,16 @@ func main() {
return
}

// Create a router to route incoming To-Device messages to the right conference.
routerChannel := routing.NewRouter(matrixClient, connectionFactory, config.Conference)
// Create a channel which we'll use to send events to the router.
matrixEvents := make(chan *event.Event)

// Start a router that will receive events from the matrix client and route them to the appropriate conference.
routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference)

// Start matrix client sync. This function will block until the sync fails.
matrixClient.RunSyncing(func(e *event.Event) {
routerChannel <- e
matrixEvents <- e
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matrixClient.RunSyncing running endless until a panic happen. Is that right?

We could avoid this if we change the signature like this:

	matrixEvents, errc := s.matrixClient.RunSyncing()
	routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference)
	if err := <-errc; err != nil {
		log(err)
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Do I get it right that the advantage is that we don't panic from inside of a package, but do it in main instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could decide that by what kind of error occurred. I think we should avoid panic if not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense, though we only have one type of error. But please confirm that my understanding is correct: the reason why we're doing this is that it would allow us to avoid panic inside a package? - If so, I agree, but couldn't we solve it easier by just returning this error instead of panicking? I.e. I can slightly change the RunSyncing() so that it does not panic once syncing stopped, but instead just returns the error back to the caller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunSyncing() acts as the main loop. errc is a kind of done channel that could interrupt the loop gracefully. Additionally errc contains the reason for the termination. With nil it was an intentional abort and with error an abort because an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But we could simply return an error instead of a panic which would be the same effect, right?

Anyway, I pushed a couple of new changes, so that:

  1. We don't panic anymore from inside the signaling package, we return an error instead (to be consistent with the API of the Matrix SDK there; it seems like a separate errc is not required in that particular case, but could be added on top of it if we ever need it?).
  2. I don't store the done inside a conference state anymore (as you pointed out, we only use it from the processMessages() loop to close it once the loop is over, so I just pass it as a local variable. That way no other function can [by accident] close a channel that it's not supposed to close 👍

Ptal 🙂


close(matrixEvents)
}
Binary file added pkg/.DS_Store
Binary file not shown.
85 changes: 85 additions & 0 deletions pkg/channel/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package channel

import (
"errors"
"sync/atomic"
)

var ErrSinkSealed = errors.New("The channel is sealed")

// SinkWithSender is a helper struct that allows to send messages to a message sink.
// The SinkWithSender abstracts the message sink which has a certain sender, so that
// the sender does not have to be specified every time a message is sent.
// At the same it guarantees that the caller can't alter the `sender`, which means that
// the sender can't impersonate another sender (and we guarantee this on a compile-time).
type SinkWithSender[SenderType comparable, MessageType any] struct {
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios.
sender SenderType
// The message sink to which the messages are sent.
messageSink chan<- Message[SenderType, MessageType]
// A channel that is used to indicate that our channel is considered sealed. It's akin
// to a close indication without really closing the channel. We don't want to close
// the channel here since we know that the sink is shared between multiple producers,
// so we only disallow sending to the sink at this point.
sealed chan struct{}
// A "mutex" that is used to protect the act of closing `sealed`.
alreadySealed atomic.Bool
}

// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases.
// Note that since the current implementation accepts a channel, it's **not responsible** for closing it.
func NewSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *SinkWithSender[S, M] {
return &SinkWithSender[S, M]{
sender: sender,
messageSink: messageSink,
sealed: make(chan struct{}),
}
}

// Sends a message to the message sink. Blocks if the sink is full!
func (s *SinkWithSender[S, M]) Send(message M) error {
if s.alreadySealed.Load() {
return ErrSinkSealed
}

messageWithSender := Message[S, M]{
Sender: s.sender,
Content: message,
}

select {
case <-s.sealed:
return ErrSinkSealed
case s.messageSink <- messageWithSender:
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not know which of the two cases is executed at runtime. With 'default' you make sure that the case is checked first.

select {
	case <-s.sealed:
		return ErrSinkSealed
	default: 
	s.messageSink <- messageWithSender:
		return nil
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is it not different semantically? I.e. what if the sender gets to the s.messageSink <- messageWithSender part and blocks there waiting for the reader to get ready? We must ensure that once the reader is not ready to accept new messages (e.g. not interested in them anymore), then we don't continue waiting on the channel. Essentially that's what I tried to describe, i.e. it's semantically I wanted this

select {
    case <-done:
        return OkRecipientDoesNotExpectNewMessages
    case s.messageSink <- messageWithSender:
        return nil
}

Though your concern is valid, I tried to catch this case by checking the atomic variable at the beginning of the function (this does not guarantee that those who called Send() just before Close() won't send a value though).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it better send a message more than blocking.

}

// Seals the channel, which means that no messages could be sent via this channel.
// Any attempt to send a message after `Seal()` returns will result in an error.
// Note that it does not mean (does not guarantee) that any existing senders that are
// waiting on the send to unblock won't send the message to the recipient (this case
// can happen if buffered channels are used). The existing senders will either unblock
// at this point and get an error that the channel is sealed or will unblock by sending
// the message to the recipient (should the recipient be ready to consume at this point).
func (s *SinkWithSender[S, M]) Seal() {
if !s.alreadySealed.CompareAndSwap(false, true) {
return
}

select {
case <-s.sealed:
return
default:
close(s.sealed)
}
}

// Messages that are sent from the peer to the conference in order to communicate with other peers.
// Since each peer is isolated from others, it can't influence the state of other peers directly.
type Message[SenderType comparable, MessageType any] struct {
// The sender of the message.
Sender SenderType
// The content of the message.
Content MessageType
}
70 changes: 0 additions & 70 deletions pkg/common/channel.go

This file was deleted.

83 changes: 0 additions & 83 deletions pkg/common/message_sink.go

This file was deleted.

33 changes: 15 additions & 18 deletions pkg/conference/matrix_message_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package conference
import (
"time"

"github.com/matrix-org/waterfall/pkg/common"
"github.com/matrix-org/waterfall/pkg/channel"
"github.com/matrix-org/waterfall/pkg/conference/participant"
"github.com/matrix-org/waterfall/pkg/peer"
"github.com/matrix-org/waterfall/pkg/signaling"
Expand All @@ -15,9 +15,8 @@ import (
type MessageContent interface{}

type MatrixMessage struct {
Sender participant.ID
Content MessageContent
RawEvent *event.Event
Sender participant.ID
Content MessageContent
}

// New participant tries to join the conference.
Expand Down Expand Up @@ -48,34 +47,32 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call
}
sdpAnswer = answer
} else {
messageSink := common.NewMessageSink(id, c.peerMessages)
messageSink := channel.NewSink(id, c.peerMessages)

peerConnection, answer, err := peer.NewPeer(c.connectionFactory, inviteEvent.Offer.SDP, messageSink, logger)
if err != nil {
logger.WithError(err).Errorf("Failed to process SDP offer")
return err
}

heartbeat := common.Heartbeat{
Interval: time.Duration(c.config.HeartbeatConfig.Interval) * time.Second,
Timeout: time.Duration(c.config.HeartbeatConfig.Timeout) * time.Second,
SendPing: func() bool {
return p.SendDataChannelMessage(event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
}) == nil
},
OnTimeout: func() {
messageSink.Send(peer.LeftTheCall{event.CallHangupKeepAliveTimeout})
},
pingEvent := event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
}

heartbeat := participant.HeartbeatConfig{
Interval: time.Duration(c.config.HeartbeatConfig.Interval) * time.Second,
Timeout: time.Duration(c.config.HeartbeatConfig.Timeout) * time.Second,
SendPing: func() bool { return p.SendDataChannelMessage(pingEvent) == nil },
OnTimeout: func() { messageSink.Send(peer.LeftTheCall{event.CallHangupKeepAliveTimeout}) },
}

p = &participant.Participant{
ID: id,
Peer: peerConnection,
Logger: logger,
RemoteSessionID: inviteEvent.SenderSessionID,
HeartbeatPong: heartbeat.Start(),
Pong: heartbeat.Start(),
}

c.tracker.AddParticipant(p)
Expand Down
8 changes: 4 additions & 4 deletions pkg/conference/matrix_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ package conference
import (
"time"

"github.com/matrix-org/waterfall/pkg/common"
"github.com/matrix-org/waterfall/pkg/signaling"
"github.com/matrix-org/waterfall/pkg/worker"
"github.com/sirupsen/logrus"
"maunium.net/go/mautrix/id"
)

type matrixWorker struct {
worker *common.Worker[signaling.MatrixMessage]
worker *worker.Worker[signaling.MatrixMessage]
deviceID id.DeviceID
}

func newMatrixWorker(handler signaling.MatrixSignaler) *matrixWorker {
workerConfig := common.WorkerConfig[signaling.MatrixMessage]{
workerConfig := worker.Config[signaling.MatrixMessage]{
ChannelSize: 128,
Timeout: time.Hour,
OnTimeout: func() {},
OnTask: func(msg signaling.MatrixMessage) { handler.SendMessage(msg) },
}

matrixWorker := &matrixWorker{
worker: common.StartWorker(workerConfig),
worker: worker.StartWorker(workerConfig),
deviceID: handler.DeviceID(),
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package common
package participant

import (
"time"
)

type Pong struct{}

// Heartbeat defines the configuration for a heartbeat.
type Heartbeat struct {
// HeartbeatConfig defines the configuration for a heartbeat.
type HeartbeatConfig struct {
// How often to send pings.
Interval time.Duration
// After which time to consider the communication stalled.
Expand All @@ -23,8 +23,8 @@ type Heartbeat struct {
// on `PongChannel` for `Timeout`. If no response is received within `Timeout`, `OnTimeout` is called.
// The goroutine stops once the channel is closed or upon handling the `OnTimeout`. The returned channel
// is what the caller should use to inform about the reception of a pong.
func (h *Heartbeat) Start() chan<- Pong {
pong := make(chan Pong, UnboundedChannelSize)
func (h *HeartbeatConfig) Start() chan<- Pong {
pong := make(chan Pong, 1)

go func() {
ticker := time.NewTicker(h.Interval)
Expand Down Expand Up @@ -52,7 +52,7 @@ func (h *Heartbeat) Start() chan<- Pong {

// Tries to send a ping message using `SendPing` and retry it if it fails.
// Returns `true` if the ping was sent successfully.
func (h *Heartbeat) sendWithRetry() bool {
func (h *HeartbeatConfig) sendWithRetry() bool {
const retries = 3
retryInterval := h.Timeout / retries

Expand Down
Loading