From 4e3263c0d90e1d81668aaaaaef8faa0c6279b918 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 24 Oct 2023 15:27:57 -0500 Subject: [PATCH] multi: Consolidate waitgroup logic. This switches the various subsystems over to use a new pattern that consolidates the waitgroup logic in a single location. This pattern is easier to reason about and less error prone since it's trivial to see at a glance that the calls to Done are happening as intended versus having to chase them down all over the code. --- connmgr/connmanager.go | 24 +++--- .../blockchain/indexers/indexsubscriber.go | 18 ++-- internal/mining/bgblktmplgenerator.go | 31 ++++--- internal/mining/cpuminer/cpuminer.go | 57 ++++++++----- internal/netsync/manager.go | 15 ++-- internal/rpcserver/rpcserver.go | 44 +++++----- internal/rpcserver/rpcwebsocket.go | 45 ++++++---- server.go | 85 ++++++++++--------- 8 files changed, 175 insertions(+), 144 deletions(-) diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 809b38b7fb..1e66da868c 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -219,9 +219,7 @@ type ConnManager struct { // with overall connection request count above. assignIDMtx sync.Mutex - // The following fields are used for lifecycle management of the connection - // manager. - wg sync.WaitGroup + // quit is used for lifecycle management of the connection manager. quit chan struct{} // cfg specifies the configuration of the connection manager and is set at @@ -458,7 +456,6 @@ out: } } - cm.wg.Done() log.Trace("Connection handler done") } @@ -685,7 +682,6 @@ func (cm *ConnManager) listenHandler(ctx context.Context, listener net.Listener) go cm.cfg.OnAccept(conn) } - cm.wg.Done() log.Tracef("Listener handler done for %s", listener.Addr()) } @@ -696,8 +692,12 @@ func (cm *ConnManager) Run(ctx context.Context) { log.Trace("Starting connection manager") // Start the connection handler goroutine. - cm.wg.Add(1) - go cm.connHandler(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + cm.connHandler(ctx) + wg.Done() + }() // Start all the listeners so long as the caller requested them and provided // a callback to be invoked when connections are accepted. @@ -706,8 +706,11 @@ func (cm *ConnManager) Run(ctx context.Context) { listeners = cm.cfg.Listeners } for _, listener := range cm.cfg.Listeners { - cm.wg.Add(1) - go cm.listenHandler(ctx, listener) + wg.Add(1) + go func(listener net.Listener) { + cm.listenHandler(ctx, listener) + wg.Done() + }(listener) } // Start enough outbound connections to reach the target number when not @@ -729,8 +732,7 @@ func (cm *ConnManager) Run(ctx context.Context) { // to recover anyways. _ = listener.Close() } - - cm.wg.Wait() + wg.Wait() log.Trace("Connection manager stopped") } diff --git a/internal/blockchain/indexers/indexsubscriber.go b/internal/blockchain/indexers/indexsubscriber.go index 75c9e83ba6..b9c869f501 100644 --- a/internal/blockchain/indexers/indexsubscriber.go +++ b/internal/blockchain/indexers/indexsubscriber.go @@ -128,7 +128,6 @@ type IndexSubscriber struct { mtx sync.Mutex ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup quit chan struct{} } @@ -345,7 +344,6 @@ func (s *IndexSubscriber) handleSyncSubscribers(ctx context.Context) { for { select { case <-ctx.Done(): - s.wg.Done() return case <-ticker.C: @@ -367,7 +365,6 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) { for { select { case <-ctx.Done(): - s.wg.Done() return case ntfn := <-s.c: @@ -394,9 +391,16 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) { // // This should be run as a goroutine. func (s *IndexSubscriber) Run(ctx context.Context) { - s.wg.Add(2) - go s.handleIndexUpdates(ctx) - go s.handleSyncSubscribers(ctx) + var wg sync.WaitGroup + wg.Add(2) + go func() { + s.handleIndexUpdates(ctx) + wg.Done() + }() + go func() { + s.handleSyncSubscribers(ctx) + wg.Done() + }() // Stop all the subscriptions and shutdown the subscriber when the context // is cancelled. @@ -411,6 +415,6 @@ func (s *IndexSubscriber) Run(ctx context.Context) { } s.mtx.Unlock() s.cancel() - s.wg.Wait() + wg.Wait() log.Tracef("Index subscriber stopped") } diff --git a/internal/mining/bgblktmplgenerator.go b/internal/mining/bgblktmplgenerator.go index 5d3e43df85..07dca77baa 100644 --- a/internal/mining/bgblktmplgenerator.go +++ b/internal/mining/bgblktmplgenerator.go @@ -223,7 +223,6 @@ func (wg *waitGroup) Wait() { // - Block direct access while generating new templates that will make the // current template stale (e.g. new parent or new votes) type BgBlkTmplGenerator struct { - wg sync.WaitGroup quit chan struct{} // These fields are provided by the caller when the generator is created and @@ -484,7 +483,6 @@ func (g *BgBlkTmplGenerator) notifySubscribersHandler(ctx context.Context) { g.subscriptionMtx.Unlock() case <-ctx.Done(): - g.wg.Done() return } } @@ -559,7 +557,6 @@ func (g *BgBlkTmplGenerator) regenQueueHandler(ctx context.Context) { } case <-ctx.Done(): - g.wg.Done() return } } @@ -1402,7 +1399,6 @@ func (g *BgBlkTmplGenerator) regenHandler(ctx context.Context) { g.genTemplateAsync(ctx, TURNewParent) case <-ctx.Done(): - g.wg.Done() return } } @@ -1477,8 +1473,6 @@ func (g *BgBlkTmplGenerator) ForceRegen() { // // This must be run as a goroutine. func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) { - defer g.wg.Done() - // Wait until the chain is synced when unsynced mining is not allowed. if !g.cfg.AllowUnsyncedMining && !g.cfg.IsCurrent() { ticker := time.NewTicker(time.Second) @@ -1515,14 +1509,27 @@ func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) { // necessary for it to function properly and blocks until the provided context // is cancelled. func (g *BgBlkTmplGenerator) Run(ctx context.Context) { - g.wg.Add(4) - go g.regenQueueHandler(ctx) - go g.regenHandler(ctx) - go g.notifySubscribersHandler(ctx) - go g.initialStartupHandler(ctx) + var wg sync.WaitGroup + wg.Add(4) + go func() { + g.regenQueueHandler(ctx) + wg.Done() + }() + go func() { + g.regenHandler(ctx) + wg.Done() + }() + go func() { + g.notifySubscribersHandler(ctx) + wg.Done() + }() + go func() { + g.initialStartupHandler(ctx) + wg.Done() + }() // Shutdown the generator when the context is cancelled. <-ctx.Done() close(g.quit) - g.wg.Wait() + wg.Wait() } diff --git a/internal/mining/cpuminer/cpuminer.go b/internal/mining/cpuminer/cpuminer.go index ded95e83f3..1e9fb04097 100644 --- a/internal/mining/cpuminer/cpuminer.go +++ b/internal/mining/cpuminer/cpuminer.go @@ -127,8 +127,6 @@ type CPUMiner struct { normalMining bool discreteMining bool submitBlockLock sync.Mutex - wg sync.WaitGroup - workerWg sync.WaitGroup updateNumWorkers chan struct{} queryHashesPerSec chan float64 speedStats map[uint64]*speedStats @@ -194,7 +192,6 @@ out: } } - m.wg.Done() log.Trace("CPU miner speed monitor done") } @@ -380,8 +377,6 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader, func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate, speedStats *speedStats, isBlake3PowActive bool) { - defer m.workerWg.Done() - for { if ctx.Err() != nil { return @@ -464,10 +459,12 @@ func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate, // It must be run as a goroutine. func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) { log.Trace("Starting generate blocks worker") - defer func() { - m.workerWg.Done() - log.Trace("Generate blocks worker done") - }() + defer log.Trace("Generate blocks worker done") + + // Separate waitgroup for solvers to ensure they are stopped prior to + // terminating the goroutine. + var solverWg sync.WaitGroup + defer solverWg.Wait() // Subscribe for block template updates and ensure the subscription is // stopped along with the worker. @@ -488,7 +485,8 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) { case templateNtfn := <-templateSub.C(): // Clean up the map that tracks the number of blocks mined on a // given parent whenever a template is received due to a new parent. - prevHash := templateNtfn.Template.Block.Header.PrevBlock + template := templateNtfn.Template + prevHash := template.Block.Header.PrevBlock if m.cfg.PermitConnectionlessMining { if templateNtfn.Reason == mining.TURNewParent { m.Lock() @@ -516,9 +514,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) { // Start another goroutine for the new template. solverCtx, solverCancel = context.WithCancel(ctx) - m.workerWg.Add(1) - go m.solver(solverCtx, templateNtfn.Template, &speedStats, - isBlake3PowActive) + solverWg.Add(1) + go func() { + m.solver(solverCtx, template, &speedStats, isBlake3PowActive) + solverWg.Done() + }() case <-ctx.Done(): // Ensure resources associated with the solver goroutine context are @@ -541,6 +541,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) { // // It must be run as a goroutine. func (m *CPUMiner) miningWorkerController(ctx context.Context) { + // Separate waitgroup for workers to ensure they are stopped prior to + // terminating the goroutine. + var workerWg sync.WaitGroup + defer workerWg.Wait() + // launchWorker groups common code to launch a worker for subscribing for // template updates and solving blocks. type workerState struct { @@ -554,8 +559,11 @@ func (m *CPUMiner) miningWorkerController(ctx context.Context) { cancel: wCancel, }) - m.workerWg.Add(1) - go m.generateBlocks(wCtx, curWorkerID) + workerWg.Add(1) + go func() { + m.generateBlocks(wCtx, curWorkerID) + workerWg.Done() + }() curWorkerID++ } @@ -603,10 +611,6 @@ out: break out } } - - // Wait until all workers shut down. - m.workerWg.Wait() - m.wg.Done() } // Run starts the CPU miner with zero workers which means it will be idle. It @@ -617,14 +621,21 @@ out: func (m *CPUMiner) Run(ctx context.Context) { log.Trace("Starting CPU miner in idle state") - m.wg.Add(2) - go m.speedMonitor(ctx) - go m.miningWorkerController(ctx) + var wg sync.WaitGroup + wg.Add(2) + go func() { + m.speedMonitor(ctx) + wg.Done() + }() + go func() { + m.miningWorkerController(ctx) + wg.Done() + }() // Shutdown the miner when the context is cancelled. <-ctx.Done() close(m.quit) - m.wg.Wait() + wg.Wait() log.Trace("CPU miner stopped") } diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index e69afeb554..f95e12ef2e 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -264,9 +264,7 @@ func (state *headerSyncState) resetStallTimeout() { // SyncManager provides a concurrency safe sync manager for handling all // incoming blocks. type SyncManager struct { - // The following fields are used for lifecycle management of the sync - // manager. - wg sync.WaitGroup + // quit is used for lifecycle management of the sync manager. quit chan struct{} // cfg specifies the configuration of the sync manager and is set at @@ -1527,7 +1525,6 @@ out: } } - m.wg.Done() log.Trace("Sync manager event handler done") } @@ -1792,13 +1789,17 @@ func (m *SyncManager) Run(ctx context.Context) { log.Trace("Starting sync manager") // Start the event handler goroutine. - m.wg.Add(1) - go m.eventHandler(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + m.eventHandler(ctx) + wg.Done() + }() // Shutdown the sync manager when the context is cancelled. <-ctx.Done() close(m.quit) - m.wg.Wait() + wg.Wait() log.Trace("Sync manager stopped") } diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 65535f2b33..37b4c4fe05 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -5014,7 +5014,6 @@ type Server struct { ntfnMgr NtfnManager statusLines map[int]string statusLock sync.RWMutex - wg sync.WaitGroup workState *workState helpCacher RPCHelpCacher requestProcessShutdown chan struct{} @@ -5141,21 +5140,6 @@ func (s *Server) writeHTTPResponseHeaders(req *http.Request, headers http.Header return err } -// shutdown terminates the processes of the rpc server. -func (s *Server) shutdown() error { - log.Warnf("RPC server shutting down") - for _, listener := range s.cfg.Listeners { - err := listener.Close() - if err != nil { - log.Errorf("Problem shutting down rpc: %v", err) - return err - } - } - s.wg.Wait() - log.Infof("RPC server shutdown complete") - return nil -} - // RequestedProcessShutdown returns a channel that is sent to when an // authorized RPC client requests the process to shutdown. If the request can // not be read immediately, it is dropped. @@ -5831,20 +5815,21 @@ func (s *Server) route(ctx context.Context) *http.Server { func (s *Server) Run(ctx context.Context) { log.Trace("Starting RPC server") server := s.route(ctx) + var wg sync.WaitGroup for _, listener := range s.cfg.Listeners { - s.wg.Add(1) + wg.Add(1) go func(listener net.Listener) { log.Infof("RPC server listening on %s", listener.Addr()) server.Serve(listener) log.Tracef("RPC listener done for %s", listener.Addr()) - s.wg.Done() + wg.Done() }(listener) } // Subscribe for async work notifications when background template // generation is enabled. if len(s.cfg.MiningAddrs) > 0 && s.cfg.BlockTemplater != nil { - s.wg.Add(1) + wg.Add(1) go func(s *Server, ctx context.Context) { templateSub := s.cfg.BlockTemplater.Subscribe() defer templateSub.Stop() @@ -5855,19 +5840,30 @@ func (s *Server) Run(ctx context.Context) { s.ntfnMgr.NotifyWork(templateNtfn) case <-ctx.Done(): - s.wg.Done() + wg.Done() return } } }(s, ctx) } + // Run the notification manager and wait for it to terminate. s.ntfnMgr.Run(ctx) - err := s.shutdown() - if err != nil { - log.Error(err) - return + + // Close all listeners and wait for all goroutines to terminate. + log.Warnf("RPC server shutting down") + var hasCloseErr bool + for _, listener := range s.cfg.Listeners { + err := listener.Close() + if err != nil { + log.Errorf("Failed to close listener %s: %v", listener.Addr(), err) + hasCloseErr = true + } + } + if !hasCloseErr { + wg.Wait() } + log.Info("RPC server shutdown complete") } // Config is a descriptor containing the RPC server configuration. diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index 8116337741..d36926c8ec 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -153,9 +153,7 @@ type wsNotificationManager struct { // Access channel for current number of connected clients. numClients chan int - // The following fields are used for lifecycle management of the - // notification manager. - wg sync.WaitGroup + // quit is used for lifecycle management of the notification manager. quit chan struct{} } @@ -172,7 +170,6 @@ func (m *wsNotificationManager) queueHandler(ctx context.Context) { select { case <-ctx.Done(): close(m.notificationMsgs) - m.wg.Done() return case n := <-m.queueNotification: @@ -567,7 +564,6 @@ out: for _, c := range clients { c.Disconnect() } - m.wg.Done() } // NumClients returns the number of clients actively being served. @@ -1204,14 +1200,21 @@ func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { // websocket client notifications. It blocks until the provided context is // cancelled. func (m *wsNotificationManager) Run(ctx context.Context) { - m.wg.Add(2) - go m.queueHandler(ctx) - go m.notificationHandler(ctx) + var wg sync.WaitGroup + wg.Add(2) + go func() { + m.queueHandler(ctx) + wg.Done() + }() + go func() { + m.notificationHandler(ctx) + wg.Done() + }() // Shutdown the notification manager when the context is cancelled. <-ctx.Done() close(m.quit) - m.wg.Wait() + wg.Wait() } // newWsNotificationManager returns a new notification manager ready for use. @@ -1283,7 +1286,6 @@ type wsClient struct { ntfnChan chan []byte sendChan chan wsResponse quit chan struct{} - wg sync.WaitGroup } // shouldLogReadError returns whether or not the passed error, which is expected @@ -1720,7 +1722,6 @@ out: // Ensure the connection is closed. c.Disconnect() - c.wg.Done() log.Tracef("Websocket client input handler done for %s", c.addr) } @@ -1808,7 +1809,6 @@ out: } } - c.wg.Done() log.Tracef("Websocket client notification queue handler done "+ "for %s", c.addr) } @@ -1837,7 +1837,6 @@ out: } } - c.wg.Done() log.Tracef("Websocket client output handler done for %s", c.addr) } @@ -1920,10 +1919,20 @@ func (c *wsClient) Run(ctx context.Context) { log.Tracef("Starting websocket client %s", c.addr) // Start processing input and output. - c.wg.Add(3) - go c.inHandler(ctx) - go c.notificationQueueHandler() - go c.outHandler() + var wg sync.WaitGroup + wg.Add(3) + go func() { + c.inHandler(ctx) + wg.Done() + }() + go func() { + c.notificationQueueHandler() + wg.Done() + }() + go func() { + c.outHandler() + wg.Done() + }() // Forcibly disconnect the websocket client when the context is cancelled // which also closes the quit channel and thus ensures all of the above @@ -1937,7 +1946,7 @@ func (c *wsClient) Run(ctx context.Context) { case <-c.quit: } - c.wg.Wait() + wg.Wait() } // newWebsocketClient returns a new websocket client given the notification diff --git a/server.go b/server.go index 866ab08955..462fa5f58b 100644 --- a/server.go +++ b/server.go @@ -496,7 +496,6 @@ type server struct { query chan interface{} relayInv chan relayMsg broadcast chan broadcastMsg - wg sync.WaitGroup nat *upnpNAT db database.DB timeSource blockchain.MedianTimeSource @@ -2354,7 +2353,6 @@ out: } s.addrManager.Stop() - s.wg.Done() srvrLog.Tracef("Peer handler done") } @@ -2980,7 +2978,6 @@ func (s *server) rebroadcastHandler(ctx context.Context) { case <-ctx.Done(): timer.Stop() - s.wg.Done() return } } @@ -3031,56 +3028,65 @@ func (s *server) Run(ctx context.Context) { srvrLog.Trace("Starting server") // Start the peer handler which in turn starts the address manager. - s.wg.Add(1) - go s.peerHandler(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + s.peerHandler(ctx) + wg.Done() + }() // Start the sync manager. - s.wg.Add(1) - go func(ctx context.Context, s *server) { + wg.Add(1) + go func() { s.syncManager.Run(ctx) - s.wg.Done() - }(ctx, s) + wg.Done() + }() // Query the seeders and start the connection manager. - s.wg.Add(1) - go func(ctx context.Context, s *server) { + wg.Add(1) + go func() { if !cfg.DisableSeeders { s.querySeeders(ctx) } s.connManager.Run(ctx) - s.wg.Done() - }(ctx, s) + wg.Done() + }() if s.nat != nil { - s.wg.Add(1) - go s.upnpUpdateThread(ctx) + wg.Add(1) + go func() { + s.upnpUpdateThread(ctx) + wg.Done() + }() } if !cfg.DisableRPC { - // Start the rebroadcastHandler, which ensures user tx received by - // the RPC server are rebroadcast until being included in a block. - s.wg.Add(1) - go s.rebroadcastHandler(ctx) - - s.wg.Add(1) - go func(ctx context.Context, s *server) { + // Start the RPC server and rebroadcast handler which ensures + // transactions submitted to the RPC server are rebroadcast until being + // included in a block. + wg.Add(2) + go func() { + s.rebroadcastHandler(ctx) + wg.Done() + }() + go func() { s.rpcServer.Run(ctx) - s.wg.Done() - }(ctx, s) + wg.Done() + }() } // Start the background block template generator and CPU miner if the config // provides a mining address. if len(cfg.miningAddrs) > 0 { - s.wg.Add(2) - go func(ctx context.Context, s *server) { + wg.Add(2) + go func() { s.bg.Run(ctx) - s.wg.Done() - }(ctx, s) - go func(ctx context.Context, s *server) { + wg.Done() + }() + go func() { s.cpuMiner.Run(ctx) - s.wg.Done() - }(ctx, s) + wg.Done() + }() // The CPU miner is started without any workers which means it is idle. // Start mining by setting the default number of workers when requested. @@ -3090,23 +3096,20 @@ func (s *server) Run(ctx context.Context) { } // Start the chain's index subscriber. - s.wg.Add(1) - go func(ctx context.Context, s *server) { + wg.Add(1) + go func() { s.indexSubscriber.Run(ctx) - s.wg.Done() - }(ctx, s) + wg.Done() + }() - // Wait until the server is signalled to shutdown. + // Shutdown the server when the context is cancelled. <-ctx.Done() s.shutdown.Store(true) srvrLog.Warnf("Server shutting down") - s.feeEstimator.Close() - s.chain.ShutdownUtxoCache() - - s.wg.Wait() + wg.Wait() srvrLog.Trace("Server stopped") } @@ -3210,8 +3213,6 @@ out: } else { srvrLog.Debugf("successfully disestablished UPnP port mapping") } - - s.wg.Done() } // standardScriptVerifyFlags returns the script flags that should be used when