From b64d8943d3c58e295ef89d8820acdcd8c759751e Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Mon, 4 Nov 2024 17:22:45 +0700 Subject: [PATCH] WIP --- dot/sync/configuration.go | 7 ++- dot/sync/fullsync.go | 2 + dot/sync/service.go | 4 +- dot/sync/worker_pool.go | 92 ++++++++++++++++++++++-------------- dot/sync/worker_pool_test.go | 24 +++++++--- 5 files changed, 83 insertions(+), 46 deletions(-) diff --git a/dot/sync/configuration.go b/dot/sync/configuration.go index 9746a42236..f8ec4bc0dc 100644 --- a/dot/sync/configuration.go +++ b/dot/sync/configuration.go @@ -7,6 +7,8 @@ import ( "time" ) +const defaultNoPeersRetryDelay = time.Second * 10 + type ServiceConfig func(svc *SyncService) func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { @@ -21,8 +23,9 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { wpCapacity *= 2 // add some buffer svc.workerPool = NewWorkerPool(WorkerPoolConfig{ - Capacity: wpCapacity, - MaxRetries: UnlimitedRetries, + Capacity: wpCapacity, + MaxRetries: UnlimitedRetries, + NoPeersRetryDelay: defaultNoPeersRetryDelay, }) } } diff --git a/dot/sync/fullsync.go b/dot/sync/fullsync.go index caa7646ba3..77b784638e 100644 --- a/dot/sync/fullsync.go +++ b/dot/sync/fullsync.go @@ -182,7 +182,9 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) ( // This is safe as long as we are the only goroutine reading from the channel. for len(results) > 0 { readyBlocks := make([][]*types.BlockData, 0) + logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove result := <-results + logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove repChange, ignorePeer, validResp := validateResult(result, f.badBlocks) if repChange != nil { diff --git a/dot/sync/service.go b/dot/sync/service.go index 9a87a474fe..24357af102 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -24,7 +24,6 @@ import ( const ( waitPeersDefaultTimeout = 10 * time.Second minPeersDefault = 1 - maxTaskRetries = 5 ) var ( @@ -175,7 +174,6 @@ func (s *SyncService) Stop() error { func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error { logger.Infof("receiving a block announce handshake from %s", from.String()) - logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove if err := s.workerPool.AddPeer(from); err != nil { logger.Warnf("failed to add peer to worker pool: %s", err) return err @@ -288,6 +286,8 @@ func (s *SyncService) runStrategy() { } _ = s.workerPool.SubmitBatch(tasks) + } else { + logger.Info("SyncService.runStrategy(): worker pool is at capacity, not asking for more tasks") // TODO: remove } done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results()) diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index d3f87e6903..69db24589d 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -8,12 +8,9 @@ import ( "context" "errors" "fmt" - "io" "sync" "time" - "github.com/ChainSafe/gossamer/dot/network" - "github.com/libp2p/go-libp2p/core/peer" ) @@ -87,8 +84,9 @@ type WorkerPool interface { } type WorkerPoolConfig struct { - Capacity int - MaxRetries int + Capacity int + MaxRetries int + NoPeersRetryDelay time.Duration } // NewWorkerPool creates a new worker pool with the given configuration. @@ -101,6 +99,7 @@ func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool { return &workerPool{ maxRetries: cfg.MaxRetries, + retryDelay: cfg.NoPeersRetryDelay, ignoredPeers: make(map[peer.ID]struct{}), statuses: make(map[BatchID]BatchStatus), resChan: make(chan TaskResult, cfg.Capacity), @@ -114,6 +113,7 @@ type workerPool struct { wg sync.WaitGroup maxRetries int + retryDelay time.Duration peers list.List ignoredPeers map[peer.ID]struct{} statuses map[BatchID]BatchStatus @@ -170,19 +170,7 @@ func (w *workerPool) AddPeer(who peer.ID) error { w.mtx.Lock() defer w.mtx.Unlock() - if _, ok := w.ignoredPeers[who]; ok { - return ErrPeerIgnored - } - - for e := w.peers.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == who { - return nil - } - } - - w.peers.PushBack(who) - logger.Tracef("peer added, total in the pool %d", w.peers.Len()) - return nil + return w.addPeer(who) } // RemovePeer removes a peer from the worker pool. @@ -213,6 +201,7 @@ func (w *workerPool) NumPeers() int { // Shutdown stops the worker pool and waits for all tasks to complete. func (w *workerPool) Shutdown() { w.cancel() + close(w.resChan) w.wg.Wait() } @@ -230,6 +219,7 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { for { select { case <-w.ctx.Done(): + close(batchResults) return case tr := <-batchResults: @@ -248,13 +238,13 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { func (w *workerPool) executeTask(task Task, ch chan TaskResult) { if errors.Is(w.ctx.Err(), context.Canceled) { - logger.Tracef("[CANCELED] task=%s, shutting down", task.String()) + logger.Infof("[CANCELED] task=%s, shutting down", task.String()) // TODO: change to debug lvl return } who, err := w.reservePeer() if errors.Is(err, ErrNoPeers) { - logger.Tracef("no peers available for task=%s", task.String()) + logger.Infof("no peers available for task=%s", task.String()) // TODO: change to trace lvl ch <- TaskResult{Task: task, Error: ErrNoPeers} return } @@ -263,15 +253,11 @@ func (w *workerPool) executeTask(task Task, ch chan TaskResult) { result, err := task.Do(who) if err != nil { - logger.Tracef("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) + logger.Debugf("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) } else { - logger.Tracef("[FINISHED] task=%s peer=%s", task.String(), who) + logger.Debugf("[FINISHED] task=%s peer=%s", task.String(), who) } - w.mtx.Lock() - w.peers.PushBack(who) - w.mtx.Unlock() - ch <- TaskResult{ Task: task, Who: who, @@ -295,6 +281,22 @@ func (w *workerPool) reservePeer() (who peer.ID, err error) { return peerElement.Value.(peer.ID), nil } +func (w *workerPool) addPeer(who peer.ID) error { + if _, ok := w.ignoredPeers[who]; ok { + return ErrPeerIgnored + } + + for e := w.peers.Front(); e != nil; e = e.Next() { + if e.Value.(peer.ID) == who { + return nil + } + } + + w.peers.PushBack(who) + logger.Tracef("peer added, total in the pool %d", w.peers.Len()) + return nil +} + func (w *workerPool) removePeer(who peer.ID) { var toRemove *list.Element for e := w.peers.Front(); e != nil; e = e.Next() { @@ -322,7 +324,9 @@ func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) { tr.Completed = true w.statuses[batchID].Success[tID] = tr - logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove + _ = w.addPeer(tr.Who) + + logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl w.resChan <- tr } @@ -330,25 +334,31 @@ func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResul w.mtx.Lock() defer w.mtx.Unlock() + if errors.Is(w.ctx.Err(), context.Canceled) { + logger.Infof("handleFailedTask(): worker pool canceled, not handling failed task") // TODO: remove + return + } + + logger.Infof("handling failed task err: %v task: %s", tr.Error, tr.Task.ID()) // TODO: remove + delayRetry := false tID := tr.Task.ID() - if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { - // It is only considered a retry if the task was actually executed. - if !errors.Is(oldTr.Error, ErrNoPeers) { - if errors.Is(oldTr.Error, io.EOF) || errors.Is(oldTr.Error, network.ErrStreamReset) { - w.removePeer(oldTr.Who) - logger.Debugf("removed peer %s from the worker pool", oldTr.Who) - } + // It is only considered a retry if the task was actually executed. + if errors.Is(tr.Error, ErrNoPeers) { + delayRetry = true + } else { + w.ignoredPeers[tr.Who] = struct{}{} + if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { tr.Retries = oldTr.Retries + 1 - tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries } } + tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries w.statuses[batchID].Failed[tID] = tr if tr.Completed { - logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove + logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl w.resChan <- tr return } @@ -357,6 +367,16 @@ func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResul w.wg.Add(1) go func() { defer w.wg.Done() + if delayRetry { + logger.Infof("delaying retry of task %s", tr.Task.String()) // TODO: remove + timer := time.NewTimer(w.retryDelay) + select { + case <-timer.C: + case <-w.ctx.Done(): + logger.Infof("in gorouting, worker pool canceled, not retrying task") // TODO: remove + return + } + } w.executeTask(tr.Task, batchResults) }() } diff --git a/dot/sync/worker_pool_test.go b/dot/sync/worker_pool_test.go index dac3f13b99..6b2634f7d8 100644 --- a/dot/sync/worker_pool_test.go +++ b/dot/sync/worker_pool_test.go @@ -55,6 +55,18 @@ func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) { return tasks, peers } +func makePool(maxRetries ...int) WorkerPool { + mr := 0 + if len(maxRetries) > 0 { + mr = maxRetries[0] + } + + return NewWorkerPool(WorkerPoolConfig{ + MaxRetries: mr, + NoPeersRetryDelay: time.Millisecond * 10, + }) +} + func waitForCompletion(wp WorkerPool, numTasks int) { resultsReceived := 0 @@ -73,7 +85,7 @@ func TestWorkerPoolHappyPath(t *testing.T) { var setup = func() (WorkerPool, []Task) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -120,7 +132,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("accepts_batch_without_any_peers", func(t *testing.T) { tasks, _ := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() wp.SubmitBatch(tasks) @@ -129,7 +141,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() assert.NoError(t, wp.AddPeer(peers[0])) assert.NoError(t, wp.AddPeer(peers[1])) @@ -145,7 +157,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) { _, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -178,7 +190,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) { failTwice.err = taskErr failTwice.succeedAfter = 2 - wp = NewWorkerPool(WorkerPoolConfig{MaxRetries: maxRetries}) + wp = makePool(maxRetries) for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err) @@ -232,7 +244,7 @@ func TestWorkerPoolMultipleBatches(t *testing.T) { b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks) peers := append(b1Peers, b2Peers...) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err)