Skip to content

Commit

Permalink
fix: protect against send on closed channel panics
Browse files Browse the repository at this point in the history
Fixes #481
  • Loading branch information
agaffney committed Feb 4, 2024
1 parent 32d9713 commit c0c42bf
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
3 changes: 1 addition & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ func (c *Connection) shutdown() {
}
// Wait for other goroutines to finish
c.waitGroup.Wait()
// Close channels
// Close consumer error channel to signify connection shutdown
close(c.errorChan)
close(c.protoErrorChan)
// We can only close a channel once, so we have to jump through a few hoops
select {
// The channel is either closed or has an item pending
Expand Down
22 changes: 13 additions & 9 deletions muxer/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ func (m *Muxer) Stop() {
_ = m.conn.Close()
// Wait for other goroutines to shutdown
m.waitGroup.Wait()
// Close protocol receive channels
// We rely on the individual mini-protocols to close the sender channel
for _, protocolRoles := range m.protocolReceivers {
for _, recvChan := range protocolRoles {
close(recvChan)
}
}
// Close ErrorChan to signify to consumer that we're shutting down
close(m.errorChan)
})
Expand Down Expand Up @@ -161,7 +154,10 @@ func (m *Muxer) RegisterProtocol(
if !ok {
return
}
case msg := <-senderChan:
case msg, ok := <-senderChan:
if !ok {
return
}
if err := m.Send(msg); err != nil {
m.sendError(err)
return
Expand Down Expand Up @@ -200,7 +196,15 @@ func (m *Muxer) Send(msg *Segment) error {
// readLoop waits for incoming data on the connection, parses the segment, and passes it to the appropriate
// protocol
func (m *Muxer) readLoop() {
defer m.waitGroup.Done()
defer func() {
m.waitGroup.Done()
// Close receiver channels
for _, protocolRoles := range m.protocolReceivers {
for _, recvChan := range protocolRoles {
close(recvChan)
}
}
}()
started := false
for {
// Break out of read loop if we're shutting down
Expand Down
25 changes: 15 additions & 10 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ func (p *Protocol) Start() {
<-p.doneChan
// Wait for all other goroutines to finish
p.waitGroup.Wait()
// Close channels
close(p.sendQueueChan)
close(p.sendStateQueueChan)
close(p.recvReadyChan)
close(p.sendReadyChan)
// Cancel any timer
if p.stateTransitionTimer != nil {
p.stateTransitionTimer.Stop()
Expand Down Expand Up @@ -174,20 +169,30 @@ func (p *Protocol) SendMessage(msg Message) error {

// SendError sends an error to the handler in the Ouroboros object
func (p *Protocol) SendError(err error) {
p.config.ErrorChan <- err
select {
case p.config.ErrorChan <- err:
default:
// Discard error if the buffer is full
// The connection will get closed on the first error, so any
// additional errors are unnecessary
return
}
}

func (p *Protocol) sendLoop() {
defer p.waitGroup.Done()
defer func() {
p.waitGroup.Done()
// Close muxer send channel
// We are responsible for closing this channel as the sender, even through it
// was created by the muxer
close(p.muxerSendChan)
}()
var setNewState bool
var newState State
var err error
for {
select {
case <-p.doneChan:
// We are responsible for closing this channel as the sender, even through it
// was created by the muxer
close(p.muxerSendChan)
// Break out of send loop if we're shutting down
return
case <-p.sendReadyChan:
Expand Down

0 comments on commit c0c42bf

Please sign in to comment.