diff --git a/protocol/blockfetch/client.go b/protocol/blockfetch/client.go index c1ac6767..bf83807c 100644 --- a/protocol/blockfetch/client.go +++ b/protocol/blockfetch/client.go @@ -32,6 +32,7 @@ type Client struct { startBatchResultChan chan error busyMutex sync.Mutex blockUseCallback bool + onceStart sync.Once onceStop sync.Once } @@ -69,15 +70,21 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { InitialState: StateIdle, } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - close(c.blockChan) - close(c.startBatchResultChan) - }() return c } +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + close(c.blockChan) + close(c.startBatchResultChan) + }() + }) +} + func (c *Client) Stop() error { var err error c.onceStop.Do(func() { diff --git a/protocol/chainsync/client.go b/protocol/chainsync/client.go index 94865a2c..341ecade 100644 --- a/protocol/chainsync/client.go +++ b/protocol/chainsync/client.go @@ -37,6 +37,7 @@ type Client struct { firstBlockChan chan common.Point wantIntersectPoint bool intersectPointChan chan common.Point + onceStart sync.Once onceStop sync.Once } @@ -88,18 +89,24 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { InitialState: stateIdle, } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - close(c.intersectResultChan) - close(c.readyForNextBlockChan) - close(c.currentTipChan) - close(c.firstBlockChan) - close(c.intersectPointChan) - }() return c } +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + close(c.intersectResultChan) + close(c.readyForNextBlockChan) + close(c.currentTipChan) + close(c.firstBlockChan) + close(c.intersectPointChan) + }() + }) +} + func (c *Client) messageHandler(msg protocol.Message) error { var err error switch msg.Type() { diff --git a/protocol/keepalive/client.go b/protocol/keepalive/client.go index 8d8adc0f..dee56517 100644 --- a/protocol/keepalive/client.go +++ b/protocol/keepalive/client.go @@ -58,22 +58,22 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { InitialState: StateClient, } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - // Stop any existing timer - c.timerMutex.Lock() - if c.timer != nil { - c.timer.Stop() - } - c.timerMutex.Unlock() - }() return c } func (c *Client) Start() { c.onceStart.Do(func() { c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + // Stop any existing timer + c.timerMutex.Lock() + if c.timer != nil { + c.timer.Stop() + } + c.timerMutex.Unlock() + }() c.sendKeepAlive() }) } diff --git a/protocol/localstatequery/client.go b/protocol/localstatequery/client.go index d6f88413..4f4f1f17 100644 --- a/protocol/localstatequery/client.go +++ b/protocol/localstatequery/client.go @@ -35,6 +35,7 @@ type Client struct { queryResultChan chan []byte acquireResultChan chan error currentEra int + onceStart sync.Once } // NewClient returns a new LocalStateQuery client object @@ -82,15 +83,21 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { c.enableGetRewardInfoPoolsBlock = true } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - close(c.queryResultChan) - close(c.acquireResultChan) - }() return c } +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + close(c.queryResultChan) + close(c.acquireResultChan) + }() + }) +} + func (c *Client) messageHandler(msg protocol.Message) error { var err error switch msg.Type() { diff --git a/protocol/localtxmonitor/client.go b/protocol/localtxmonitor/client.go index c2e953c1..dd574618 100644 --- a/protocol/localtxmonitor/client.go +++ b/protocol/localtxmonitor/client.go @@ -32,6 +32,7 @@ type Client struct { hasTxResultChan chan bool nextTxResultChan chan []byte getSizesResultChan chan MsgReplyGetSizesResult + onceStart sync.Once onceStop sync.Once } @@ -72,17 +73,23 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { InitialState: stateIdle, } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - close(c.acquireResultChan) - close(c.hasTxResultChan) - close(c.nextTxResultChan) - close(c.getSizesResultChan) - }() return c } +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + close(c.acquireResultChan) + close(c.hasTxResultChan) + close(c.nextTxResultChan) + close(c.getSizesResultChan) + }() + }) +} + func (c *Client) messageHandler(msg protocol.Message) error { var err error switch msg.Type() { diff --git a/protocol/localtxsubmission/client.go b/protocol/localtxsubmission/client.go index 50b52cff..df61f5db 100644 --- a/protocol/localtxsubmission/client.go +++ b/protocol/localtxsubmission/client.go @@ -28,6 +28,7 @@ type Client struct { config *Config busyMutex sync.Mutex submitResultChan chan error + onceStart sync.Once onceStop sync.Once } @@ -61,14 +62,20 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { InitialState: stateIdle, } c.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-c.Protocol.DoneChan() - close(c.submitResultChan) - }() return c } +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.Protocol.DoneChan() + close(c.submitResultChan) + }() + }) +} + func (c *Client) messageHandler(msg protocol.Message) error { var err error switch msg.Type() { diff --git a/protocol/txsubmission/server.go b/protocol/txsubmission/server.go index 1d750bf4..c9904e01 100644 --- a/protocol/txsubmission/server.go +++ b/protocol/txsubmission/server.go @@ -16,6 +16,7 @@ package txsubmission import ( "fmt" + "sync" "github.com/blinklabs-io/gouroboros/protocol" ) @@ -27,6 +28,7 @@ type Server struct { stateDone bool requestTxIdsResultChan chan []TxIdAndSize requestTxsResultChan chan []TxBody + onceStart sync.Once } func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { @@ -48,15 +50,21 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { InitialState: stateInit, } s.Protocol = protocol.New(protoConfig) - // Start goroutine to cleanup resources on protocol shutdown - go func() { - <-s.Protocol.DoneChan() - close(s.requestTxIdsResultChan) - close(s.requestTxsResultChan) - }() return s } +func (s *Server) Start() { + s.onceStart.Do(func() { + s.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-s.Protocol.DoneChan() + close(s.requestTxIdsResultChan) + close(s.requestTxsResultChan) + }() + }) +} + func (s *Server) messageHandler(msg protocol.Message) error { var err error switch msg.Type() {