From 6a84584b31cca9f7ba6928223f5f5861433174d6 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Mon, 4 Nov 2024 17:22:45 +0700 Subject: [PATCH] WIP --- dot/sync/configuration.go | 7 +++-- dot/sync/service.go | 2 -- dot/sync/worker_pool.go | 57 ++++++++++++++++++++++++------------ dot/sync/worker_pool_test.go | 24 +++++++++++---- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/dot/sync/configuration.go b/dot/sync/configuration.go index 9746a42236..f8ec4bc0dc 100644 --- a/dot/sync/configuration.go +++ b/dot/sync/configuration.go @@ -7,6 +7,8 @@ import ( "time" ) +const defaultNoPeersRetryDelay = time.Second * 10 + type ServiceConfig func(svc *SyncService) func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { @@ -21,8 +23,9 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { wpCapacity *= 2 // add some buffer svc.workerPool = NewWorkerPool(WorkerPoolConfig{ - Capacity: wpCapacity, - MaxRetries: UnlimitedRetries, + Capacity: wpCapacity, + MaxRetries: UnlimitedRetries, + NoPeersRetryDelay: defaultNoPeersRetryDelay, }) } } diff --git a/dot/sync/service.go b/dot/sync/service.go index 9a87a474fe..5305adca95 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -24,7 +24,6 @@ import ( const ( waitPeersDefaultTimeout = 10 * time.Second minPeersDefault = 1 - maxTaskRetries = 5 ) var ( @@ -175,7 +174,6 @@ func (s *SyncService) Stop() error { func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error { logger.Infof("receiving a block announce handshake from %s", from.String()) - logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove if err := s.workerPool.AddPeer(from); err != nil { logger.Warnf("failed to add peer to worker pool: %s", err) return err diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index d3f87e6903..23ab8bad85 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -8,12 +8,9 @@ import ( "context" "errors" "fmt" - "io" "sync" "time" - "github.com/ChainSafe/gossamer/dot/network" - "github.com/libp2p/go-libp2p/core/peer" ) @@ -87,8 +84,9 @@ type WorkerPool interface { } type WorkerPoolConfig struct { - Capacity int - MaxRetries int + Capacity int + MaxRetries int + NoPeersRetryDelay time.Duration } // NewWorkerPool creates a new worker pool with the given configuration. @@ -101,6 +99,7 @@ func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool { return &workerPool{ maxRetries: cfg.MaxRetries, + retryDelay: cfg.NoPeersRetryDelay, ignoredPeers: make(map[peer.ID]struct{}), statuses: make(map[BatchID]BatchStatus), resChan: make(chan TaskResult, cfg.Capacity), @@ -114,6 +113,7 @@ type workerPool struct { wg sync.WaitGroup maxRetries int + retryDelay time.Duration peers list.List ignoredPeers map[peer.ID]struct{} statuses map[BatchID]BatchStatus @@ -213,6 +213,7 @@ func (w *workerPool) NumPeers() int { // Shutdown stops the worker pool and waits for all tasks to complete. func (w *workerPool) Shutdown() { w.cancel() + close(w.resChan) w.wg.Wait() } @@ -230,6 +231,7 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { for { select { case <-w.ctx.Done(): + close(batchResults) return case tr := <-batchResults: @@ -248,13 +250,13 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { func (w *workerPool) executeTask(task Task, ch chan TaskResult) { if errors.Is(w.ctx.Err(), context.Canceled) { - logger.Tracef("[CANCELED] task=%s, shutting down", task.String()) + logger.Infof("[CANCELED] task=%s, shutting down", task.String()) // TODO: change to debug lvl return } who, err := w.reservePeer() if errors.Is(err, ErrNoPeers) { - logger.Tracef("no peers available for task=%s", task.String()) + logger.Infof("no peers available for task=%s", task.String()) // TODO: change to trace lvl ch <- TaskResult{Task: task, Error: ErrNoPeers} return } @@ -263,9 +265,9 @@ func (w *workerPool) executeTask(task Task, ch chan TaskResult) { result, err := task.Do(who) if err != nil { - logger.Tracef("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) + logger.Debugf("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) } else { - logger.Tracef("[FINISHED] task=%s peer=%s", task.String(), who) + logger.Debugf("[FINISHED] task=%s peer=%s", task.String(), who) } w.mtx.Lock() @@ -322,7 +324,7 @@ func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) { tr.Completed = true w.statuses[batchID].Success[tID] = tr - logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove + logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl w.resChan <- tr } @@ -330,25 +332,32 @@ func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResul w.mtx.Lock() defer w.mtx.Unlock() + if errors.Is(w.ctx.Err(), context.Canceled) { + logger.Infof("handleFailedTask(): worker pool canceled, not handling failed task") // TODO: remove + return + } + + logger.Infof("handling failed task err: %v task: %s", tr.Error, tr.Task.ID()) // TODO: remove + delayRetry := false tID := tr.Task.ID() - if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { - // It is only considered a retry if the task was actually executed. - if !errors.Is(oldTr.Error, ErrNoPeers) { - if errors.Is(oldTr.Error, io.EOF) || errors.Is(oldTr.Error, network.ErrStreamReset) { - w.removePeer(oldTr.Who) - logger.Debugf("removed peer %s from the worker pool", oldTr.Who) - } + // It is only considered a retry if the task was actually executed. + if errors.Is(tr.Error, ErrNoPeers) { + delayRetry = true + } else { + w.removePeer(tr.Who) + logger.Infof("removed peer %s from the worker pool", tr.Who) // TODO: remove + if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { tr.Retries = oldTr.Retries + 1 - tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries } } + tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries w.statuses[batchID].Failed[tID] = tr if tr.Completed { - logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove + logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl w.resChan <- tr return } @@ -357,6 +366,16 @@ func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResul w.wg.Add(1) go func() { defer w.wg.Done() + if delayRetry { + logger.Infof("delaying retry of task %s", tr.Task.String()) // TODO: remove + timer := time.NewTimer(w.retryDelay) + select { + case <-timer.C: + case <-w.ctx.Done(): + logger.Infof("in gorouting, worker pool canceled, not retrying task") // TODO: remove + return + } + } w.executeTask(tr.Task, batchResults) }() } diff --git a/dot/sync/worker_pool_test.go b/dot/sync/worker_pool_test.go index dac3f13b99..6b2634f7d8 100644 --- a/dot/sync/worker_pool_test.go +++ b/dot/sync/worker_pool_test.go @@ -55,6 +55,18 @@ func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) { return tasks, peers } +func makePool(maxRetries ...int) WorkerPool { + mr := 0 + if len(maxRetries) > 0 { + mr = maxRetries[0] + } + + return NewWorkerPool(WorkerPoolConfig{ + MaxRetries: mr, + NoPeersRetryDelay: time.Millisecond * 10, + }) +} + func waitForCompletion(wp WorkerPool, numTasks int) { resultsReceived := 0 @@ -73,7 +85,7 @@ func TestWorkerPoolHappyPath(t *testing.T) { var setup = func() (WorkerPool, []Task) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -120,7 +132,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("accepts_batch_without_any_peers", func(t *testing.T) { tasks, _ := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() wp.SubmitBatch(tasks) @@ -129,7 +141,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() assert.NoError(t, wp.AddPeer(peers[0])) assert.NoError(t, wp.AddPeer(peers[1])) @@ -145,7 +157,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) { _, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -178,7 +190,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) { failTwice.err = taskErr failTwice.succeedAfter = 2 - wp = NewWorkerPool(WorkerPoolConfig{MaxRetries: maxRetries}) + wp = makePool(maxRetries) for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err) @@ -232,7 +244,7 @@ func TestWorkerPoolMultipleBatches(t *testing.T) { b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks) peers := append(b1Peers, b2Peers...) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err)