Skip to content

Commit

Permalink
Merge pull request #480 from blinklabs-io/fix/protocol-deadlocks
Browse files Browse the repository at this point in the history
fix: handle protocol shutdown more cleanly
  • Loading branch information
agaffney authored Jan 27, 2024
2 parents f6b4e39 + 91b7aa4 commit 5515a20
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 25 deletions.
11 changes: 9 additions & 2 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
go func() {
<-c.Protocol.DoneChan()
close(c.blockChan)
close(c.startBatchResultChan)
}()
return c
}
Expand All @@ -95,7 +96,10 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.busyMutex.Unlock()
return err
}
err := <-c.startBatchResultChan
err, ok := <-c.startBatchResultChan
if !ok {
return protocol.ProtocolShuttingDownError
}
if err != nil {
c.busyMutex.Unlock()
return err
Expand All @@ -112,7 +116,10 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.busyMutex.Unlock()
return nil, err
}
err := <-c.startBatchResultChan
err, ok := <-c.startBatchResultChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
if err != nil {
c.busyMutex.Unlock()
return nil, err
Expand Down
18 changes: 15 additions & 3 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,15 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
if err := c.SendMessage(msg); err != nil {
return nil, err
}
tip := <-c.currentTipChan
tip, ok := <-c.currentTipChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
// Clear out intersect result channel to prevent blocking
<-c.intersectResultChan
_, ok = <-c.intersectResultChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
c.wantCurrentTip = false
return &tip, nil
}
Expand All @@ -171,6 +177,8 @@ func (c *Client) GetAvailableBlockRange(
gotIntersectResult := false
for {
select {
case <-c.DoneChan():
return start, end, protocol.ProtocolShuttingDownError
case tip := <-c.currentTipChan:
end = tip.Point
c.wantCurrentTip = false
Expand Down Expand Up @@ -200,6 +208,8 @@ func (c *Client) GetAvailableBlockRange(
}
for {
select {
case <-c.DoneChan():
return start, end, protocol.ProtocolShuttingDownError
case tip := <-c.currentTipChan:
end = tip.Point
c.wantCurrentTip = false
Expand Down Expand Up @@ -237,7 +247,9 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
if err := c.SendMessage(msg); err != nil {
return err
}
if err := <-c.intersectResultChan; err != nil {
if err, ok := <-c.intersectResultChan; !ok {
return protocol.ProtocolShuttingDownError
} else if err != nil {
return err
}
// Pipeline the initial block requests to speed things up a bit
Expand Down
18 changes: 3 additions & 15 deletions protocol/keepalive/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,9 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
// Start goroutine to cleanup resources on protocol shutdown
go func() {
<-c.Protocol.DoneChan()
// Stop any existing timer
if c.timer != nil {
// Stop timer and drain channel
if ok := c.timer.Stop(); !ok {
// Read item from channel, if available
select {
case <-c.timer.C:
default:
}
}
c.timer.Stop()
}
}()
return c
Expand All @@ -93,13 +87,7 @@ func (c *Client) sendKeepAlive() {
func (c *Client) startTimer() {
// Stop any existing timer
if c.timer != nil {
if ok := c.timer.Stop(); !ok {
// Read item from channel, if available
select {
case <-c.timer.C:
default:
}
}
c.timer.Stop()
}
// Create new timer
c.timer = time.AfterFunc(c.config.Period, c.sendKeepAlive)
Expand Down
10 changes: 8 additions & 2 deletions protocol/localstatequery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func (c *Client) acquire(point *common.Point) error {
if err := c.SendMessage(msg); err != nil {
return err
}
err := <-c.acquireResultChan
err, ok := <-c.acquireResultChan
if !ok {
return protocol.ProtocolShuttingDownError
}
return err
}

Expand All @@ -178,7 +181,10 @@ func (c *Client) runQuery(query interface{}, result interface{}) error {
if err := c.SendMessage(msg); err != nil {
return err
}
resultCbor := <-c.queryResultChan
resultCbor, ok := <-c.queryResultChan
if !ok {
return protocol.ProtocolShuttingDownError
}
if _, err := cbor.Decode(resultCbor, result); err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion protocol/localtxmonitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func (c *Client) acquire() error {
return err
}
// Wait for reply
<-c.acquireResultChan
_, ok := <-c.acquireResultChan
if !ok {
return protocol.ProtocolShuttingDownError
}
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion protocol/localtxsubmission/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func (c *Client) SubmitTx(eraId uint16, tx []byte) error {
if err := c.SendMessage(msg); err != nil {
return err
}
err := <-c.submitResultChan
err, ok := <-c.submitResultChan
if !ok {
return protocol.ProtocolShuttingDownError
}
return err
}

Expand Down
5 changes: 4 additions & 1 deletion protocol/peersharing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func (c *Client) GetPeers(amount uint8) ([]interface{}, error) {
if err := c.SendMessage(msg); err != nil {
return nil, err
}
peers := <-c.sharePeersChan
peers, ok := <-c.sharePeersChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
return peers, nil
}

Expand Down

0 comments on commit 5515a20

Please sign in to comment.