Skip to content

Commit

Permalink
feat: make connection shutdown async
Browse files Browse the repository at this point in the history
Fixes #416
  • Loading branch information
agaffney committed Nov 5, 2023
1 parent 8e5061b commit d192246
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,6 @@ func (c *Connection) Close() error {
c.onceClose.Do(func() {
// Close doneChan to signify that we're shutting down
close(c.doneChan)
// Gracefully stop the muxer
if c.muxer != nil {
c.muxer.Stop()
}
// Wait for other goroutines to finish
c.waitGroup.Wait()
// Close channels
close(c.errorChan)
close(c.protoErrorChan)
// We can only close a channel once, so we have to jump through a few hoops
select {
// The channel is either closed or has an item pending
case _, ok := <-c.handshakeFinishedChan:
// We successfully retrieved an item
// This will probably never happen, but it doesn't hurt to cover this case
if ok {
close(c.handshakeFinishedChan)
}
// The channel is open and has no pending items
default:
close(c.handshakeFinishedChan)
}
})
return err
}
Expand Down Expand Up @@ -214,9 +192,41 @@ func (c *Connection) TxSubmission() *txsubmission.TxSubmission {
return c.txSubmission
}

// shutdown performs cleanup operations when the connection is shutdown, either due to explicit Close() or an error
func (c *Connection) shutdown() {
// Gracefully stop the muxer
if c.muxer != nil {
c.muxer.Stop()
}
// Wait for other goroutines to finish
c.waitGroup.Wait()
// Close channels
close(c.errorChan)
close(c.protoErrorChan)
// We can only close a channel once, so we have to jump through a few hoops
select {
// The channel is either closed or has an item pending
case _, ok := <-c.handshakeFinishedChan:
// We successfully retrieved an item
// This will probably never happen, but it doesn't hurt to cover this case
if ok {
close(c.handshakeFinishedChan)
}
// The channel is open and has no pending items
default:
close(c.handshakeFinishedChan)
}
}

// setupConnection establishes the muxer, configures and starts the handshake process, and initializes
// the appropriate mini-protocols
func (c *Connection) setupConnection() error {
// Start Goroutine to shutdown when doneChan is closed
go func() {
<-c.doneChan
c.shutdown()
}()
// Create muxer instance
c.muxer = muxer.New(c.conn)
// Start Goroutine to pass along errors from the muxer
c.waitGroup.Add(1)
Expand Down

0 comments on commit d192246

Please sign in to comment.