From 6694cc3951bb35a72c010f816cac4e6eb42835b1 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Mon, 4 Nov 2024 17:22:45 +0700 Subject: [PATCH] WIP --- dot/sync/configuration.go | 17 +- dot/sync/fullsync.go | 17 +- dot/sync/service.go | 33 ++- dot/sync/worker_pool.go | 536 +++++++++++++++++++---------------- dot/sync/worker_pool_test.go | 24 +- 5 files changed, 353 insertions(+), 274 deletions(-) diff --git a/dot/sync/configuration.go b/dot/sync/configuration.go index 9746a42236..313c85f642 100644 --- a/dot/sync/configuration.go +++ b/dot/sync/configuration.go @@ -7,6 +7,8 @@ import ( "time" ) +const defaultNoPeersRetryDelay = time.Second * 10 + type ServiceConfig func(svc *SyncService) func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { @@ -14,15 +16,16 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { svc.currentStrategy = currentStrategy svc.defaultStrategy = defaultStrategy - wpCapacity := currentStrategy.NumOfTasks() - if defaultStrategy != nil { - wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks()) - } - wpCapacity *= 2 // add some buffer + wpCapacity := 100 // TODO set to MaxPeers config // currentStrategy.NumOfTasks() + //if defaultStrategy != nil { + // wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks()) + //} + //wpCapacity *= 2 // add some buffer svc.workerPool = NewWorkerPool(WorkerPoolConfig{ - Capacity: wpCapacity, - MaxRetries: UnlimitedRetries, + Capacity: wpCapacity, + MaxRetries: UnlimitedRetries, + NoPeersRetryDelay: defaultNoPeersRetryDelay, }) } } diff --git a/dot/sync/fullsync.go b/dot/sync/fullsync.go index caa7646ba3..c49fd0ab9c 100644 --- a/dot/sync/fullsync.go +++ b/dot/sync/fullsync.go @@ -180,9 +180,16 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) ( } // This is safe as long as we are the only goroutine reading from the channel. - for len(results) > 0 { + for result := range results { + if result.Error != nil { + f.requestQueue.PushBack(result.Task.(*syncTask).request) // TODO: check cast + continue + } + readyBlocks := make([][]*types.BlockData, 0) - result := <-results + //logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove + //result := <-results + //logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove repChange, ignorePeer, validResp := validateResult(result, f.badBlocks) if repChange != nil { @@ -440,9 +447,9 @@ type RequestResponseData struct { func validateResult(result TaskResult, badBlocks []string) (repChange *Change, blockPeer bool, validRes *RequestResponseData) { - if !result.Completed { - return - } + //if !result.Completed { + // return + //} task, ok := result.Task.(*syncTask) if !ok { diff --git a/dot/sync/service.go b/dot/sync/service.go index 9a87a474fe..cac9c9b342 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -24,7 +24,6 @@ import ( const ( waitPeersDefaultTimeout = 10 * time.Second minPeersDefault = 1 - maxTaskRetries = 5 ) var ( @@ -175,7 +174,6 @@ func (s *SyncService) Stop() error { func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error { logger.Infof("receiving a block announce handshake from %s", from.String()) - logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove if err := s.workerPool.AddPeer(from); err != nil { logger.Warnf("failed to add peer to worker pool: %s", err) return err @@ -204,8 +202,8 @@ func (s *SyncService) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnoun } func (s *SyncService) OnConnectionClosed(who peer.ID) { - logger.Tracef("removing peer worker: %s", who.String()) - s.workerPool.RemovePeer(who) + //logger.Tracef("removing peer worker: %s", who.String()) + //s.workerPool.RemovePeer(who) } func (s *SyncService) IsSynced() bool { @@ -275,20 +273,25 @@ func (s *SyncService) runStrategy() { bestBlockHeader.Hash().Short(), ) - if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() { - tasks, err := s.currentStrategy.NextActions() - if err != nil { - logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) - return - } + //if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() { + tasks, err := s.currentStrategy.NextActions() + if err != nil { + logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) + return + } - logger.Tracef("amount of tasks to process: %d", len(tasks)) - if len(tasks) == 0 { - return - } + logger.Tracef("amount of tasks to process: %d", len(tasks)) + if len(tasks) == 0 { + return + } - _ = s.workerPool.SubmitBatch(tasks) + if err := s.workerPool.SubmitBatch(tasks); err != nil { + logger.Debugf("unable to submit tasks to worker pool: %v", err) + return } + //} else { + // logger.Info("SyncService.runStrategy(): worker pool is at capacity, not asking for more tasks") // TODO: remove + //} done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results()) if err != nil { diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index d3f87e6903..2b2bb074ec 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -4,16 +4,11 @@ package sync import ( - "container/list" "context" "errors" - "fmt" - "io" "sync" "time" - "github.com/ChainSafe/gossamer/dot/network" - "github.com/libp2p/go-libp2p/core/peer" ) @@ -35,60 +30,61 @@ type Task interface { } type TaskResult struct { - Task Task - Completed bool - Result Result - Error error - Retries int - Who peer.ID + Task Task + //Completed bool + Result Result + Error error + Retries int + Who peer.ID } func (t TaskResult) Failed() bool { return t.Error != nil } -type BatchStatus struct { - Failed map[TaskID]TaskResult - Success map[TaskID]TaskResult -} - -func (bs BatchStatus) Completed(todo int) bool { - if len(bs.Failed)+len(bs.Success) < todo { - return false - } - - for _, tr := range bs.Failed { - if !tr.Completed { - return false - } - } - - for _, tr := range bs.Success { - if !tr.Completed { - return false - } - } - - return true -} - -type BatchID string +//type BatchStatus struct { +// Failed map[TaskID]TaskResult +// Success map[TaskID]TaskResult +//} + +//func (bs BatchStatus) Completed(todo int) bool { +// if len(bs.Failed)+len(bs.Success) < todo { +// return false +// } +// +// for _, tr := range bs.Failed { +// if !tr.Completed { +// return false +// } +// } +// +// for _, tr := range bs.Success { +// if !tr.Completed { +// return false +// } +// } +// +// return true +//} + +//type BatchID string type WorkerPool interface { - SubmitBatch(tasks []Task) BatchID - GetBatch(id BatchID) (status BatchStatus, ok bool) + SubmitBatch(tasks []Task) error //BatchID + //GetBatch(id BatchID) (status BatchStatus, ok bool) Results() chan TaskResult - Capacity() int + //Capacity() int AddPeer(p peer.ID) error - RemovePeer(p peer.ID) + //RemovePeer(p peer.ID) IgnorePeer(p peer.ID) NumPeers() int Shutdown() } type WorkerPoolConfig struct { - Capacity int - MaxRetries int + Capacity int + MaxRetries int + NoPeersRetryDelay time.Duration } // NewWorkerPool creates a new worker pool with the given configuration. @@ -100,12 +96,14 @@ func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool { } return &workerPool{ - maxRetries: cfg.MaxRetries, + //maxRetries: cfg.MaxRetries, + //retryDelay: cfg.NoPeersRetryDelay, + peers: make(chan peer.ID, cfg.Capacity), ignoredPeers: make(map[peer.ID]struct{}), - statuses: make(map[BatchID]BatchStatus), - resChan: make(chan TaskResult, cfg.Capacity), - ctx: ctx, - cancel: cancel, + //statuses: make(map[BatchID]BatchStatus), + //resChan: make(chan TaskResult, cfg.Capacity), + ctx: ctx, + cancel: cancel, } } @@ -114,45 +112,78 @@ type workerPool struct { wg sync.WaitGroup maxRetries int - peers list.List + retryDelay time.Duration + peers chan peer.ID // list.List ignoredPeers map[peer.ID]struct{} - statuses map[BatchID]BatchStatus - resChan chan TaskResult - ctx context.Context - cancel context.CancelFunc + //statuses map[BatchID]BatchStatus + resChan chan TaskResult + ctx context.Context + cancel context.CancelFunc } // SubmitBatch accepts a list of tasks and immediately returns a batch ID. The batch ID can be used to query the status // of the batch using [GetBatchStatus]. -func (w *workerPool) SubmitBatch(tasks []Task) BatchID { - w.mtx.Lock() - defer w.mtx.Unlock() - - bID := BatchID(fmt.Sprintf("%d", time.Now().UnixNano())) - - w.statuses[bID] = BatchStatus{ - Failed: make(map[TaskID]TaskResult), - Success: make(map[TaskID]TaskResult), +func (w *workerPool) SubmitBatch(tasks []Task) error { //BatchID { + if err := w.ctx.Err(); err != nil { + return err } - w.wg.Add(1) + w.resChan = make(chan TaskResult, len(tasks)) + go func() { - defer w.wg.Done() - w.executeBatch(tasks, bID) + for _, t := range tasks { + w.wg.Add(1) + + go func() { + defer w.wg.Done() + + var who peer.ID + select { + case <-w.ctx.Done(): + return + case who = <-w.peers: + } + + result, err := t.Do(who) + _ = w.AddPeer(who) + w.resChan <- TaskResult{Task: t, Who: who, Result: result, Error: err} + }() + } + + w.wg.Wait() + close(w.resChan) + w.resChan = nil }() - return bID -} + //w.mtx.Lock() + //defer w.mtx.Unlock() -// GetBatch returns the status of a batch previously submitted using [SubmitBatch]. -func (w *workerPool) GetBatch(id BatchID) (status BatchStatus, ok bool) { - w.mtx.RLock() - defer w.mtx.RUnlock() + //bID := BatchID(fmt.Sprintf("%d", time.Now().UnixNano())) + + //w.statuses[bID] = BatchStatus{ + // Failed: make(map[TaskID]TaskResult), + // Success: make(map[TaskID]TaskResult), + //} - status, ok = w.statuses[id] - return + //w.wg.Add(1) + //go func() { + // defer w.wg.Done() + // w.executeBatch(tasks) //, bID) + //}() + + //return bID + return nil } +// GetBatch returns the status of a batch previously submitted using [SubmitBatch]. +//func (w *workerPool) GetBatch(id BatchID) (status BatchStatus, ok bool) { +// w.mtx.RLock() +// defer w.mtx.RUnlock() +// +// status, ok = w.statuses[id] +// return +//} + // Results returns a channel that can be used to receive the results of completed tasks. func (w *workerPool) Results() chan TaskResult { return w.resChan @@ -170,35 +201,23 @@ func (w *workerPool) AddPeer(who peer.ID) error { w.mtx.Lock() defer w.mtx.Unlock() - if _, ok := w.ignoredPeers[who]; ok { - return ErrPeerIgnored - } - - for e := w.peers.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == who { - return nil - } - } - - w.peers.PushBack(who) - logger.Tracef("peer added, total in the pool %d", w.peers.Len()) - return nil + return w.addPeer(who) } // RemovePeer removes a peer from the worker pool. -func (w *workerPool) RemovePeer(who peer.ID) { - w.mtx.Lock() - defer w.mtx.Unlock() - - w.removePeer(who) -} +//func (w *workerPool) RemovePeer(who peer.ID) { +// w.mtx.Lock() +// defer w.mtx.Unlock() +// +// w.removePeer(who) +//} // IgnorePeer removes a peer from the worker pool and prevents it from being added again. func (w *workerPool) IgnorePeer(who peer.ID) { w.mtx.Lock() defer w.mtx.Unlock() - w.removePeer(who) + //w.removePeer(who) w.ignoredPeers[who] = struct{}{} } @@ -207,164 +226,199 @@ func (w *workerPool) NumPeers() int { w.mtx.RLock() defer w.mtx.RUnlock() - return w.peers.Len() + return len(w.peers) } // Shutdown stops the worker pool and waits for all tasks to complete. func (w *workerPool) Shutdown() { w.cancel() + //close(w.resChan) w.wg.Wait() } -func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { - batchResults := make(chan TaskResult, len(tasks)) - - for _, t := range tasks { - w.wg.Add(1) - go func(t Task) { - defer w.wg.Done() - w.executeTask(t, batchResults) - }(t) - } - - for { - select { - case <-w.ctx.Done(): - return - - case tr := <-batchResults: - if tr.Failed() { - w.handleFailedTask(tr, bID, batchResults) - } else { - w.handleSuccessfulTask(tr, bID) - } - - if w.batchCompleted(bID, len(tasks)) { - return - } - } - } -} - -func (w *workerPool) executeTask(task Task, ch chan TaskResult) { - if errors.Is(w.ctx.Err(), context.Canceled) { - logger.Tracef("[CANCELED] task=%s, shutting down", task.String()) - return - } - - who, err := w.reservePeer() - if errors.Is(err, ErrNoPeers) { - logger.Tracef("no peers available for task=%s", task.String()) - ch <- TaskResult{Task: task, Error: ErrNoPeers} - return - } - - logger.Infof("[EXECUTING] task=%s", task.String()) - - result, err := task.Do(who) - if err != nil { - logger.Tracef("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) - } else { - logger.Tracef("[FINISHED] task=%s peer=%s", task.String(), who) - } - - w.mtx.Lock() - w.peers.PushBack(who) - w.mtx.Unlock() - - ch <- TaskResult{ - Task: task, - Who: who, - Result: result, - Error: err, - Retries: 0, - } -} - -func (w *workerPool) reservePeer() (who peer.ID, err error) { - w.mtx.Lock() - defer w.mtx.Unlock() - - peerElement := w.peers.Front() - - if peerElement == nil { - return who, ErrNoPeers - } - - w.peers.Remove(peerElement) - return peerElement.Value.(peer.ID), nil -} - -func (w *workerPool) removePeer(who peer.ID) { - var toRemove *list.Element - for e := w.peers.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == who { - toRemove = e - break - } - } - - if toRemove != nil { - w.peers.Remove(toRemove) - } -} - -func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) { - w.mtx.Lock() - defer w.mtx.Unlock() - - tID := tr.Task.ID() - - if failedTr, ok := w.statuses[batchID].Failed[tID]; ok { - tr.Retries = failedTr.Retries + 1 - delete(w.statuses[batchID].Failed, tID) - } - - tr.Completed = true - w.statuses[batchID].Success[tID] = tr - logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove - w.resChan <- tr -} - -func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResults chan TaskResult) { - w.mtx.Lock() - defer w.mtx.Unlock() - - tID := tr.Task.ID() - - if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { - // It is only considered a retry if the task was actually executed. - if !errors.Is(oldTr.Error, ErrNoPeers) { - if errors.Is(oldTr.Error, io.EOF) || errors.Is(oldTr.Error, network.ErrStreamReset) { - w.removePeer(oldTr.Who) - logger.Debugf("removed peer %s from the worker pool", oldTr.Who) - } - - tr.Retries = oldTr.Retries + 1 - tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries - } +//func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { +// batchResults := make(chan TaskResult, len(tasks)) +// +// for _, t := range tasks { +// w.wg.Add(1) +// go func(t Task) { +// defer w.wg.Done() +// w.executeTask(t, batchResults) +// }(t) +// } +// +// for { +// select { +// case <-w.ctx.Done(): +// close(batchResults) +// return +// +// case tr := <-batchResults: +// if tr.Failed() { +// w.handleFailedTask(tr, bID, batchResults) +// } else { +// w.handleSuccessfulTask(tr, bID) +// } +// +// if w.batchCompleted(bID, len(tasks)) { +// return +// } +// } +// } +//} + +//func (w *workerPool) executeTask(task Task, ch chan TaskResult) { +// if errors.Is(w.ctx.Err(), context.Canceled) { +// logger.Infof("[CANCELED] task=%s, shutting down", task.String()) // TODO: change to debug lvl +// return +// } +// +// who, err := w.reservePeer() +// if errors.Is(err, ErrNoPeers) { +// logger.Infof("no peers available for task=%s", task.String()) // TODO: change to trace lvl +// ch <- TaskResult{Task: task, Error: ErrNoPeers} +// return +// } +// +// logger.Infof("[EXECUTING] task=%s", task.String()) +// +// result, err := task.Do(who) +// if err != nil { +// logger.Debugf("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) +// } else { +// logger.Debugf("[FINISHED] task=%s peer=%s", task.String(), who) +// } +// +// ch <- TaskResult{ +// Task: task, +// Who: who, +// Result: result, +// Error: err, +// Retries: 0, +// } +//} + +//func (w *workerPool) reservePeer() (who peer.ID, err error) { +// w.mtx.Lock() +// defer w.mtx.Unlock() +// +// peerElement := w.peers.Front() +// +// if peerElement == nil { +// return who, ErrNoPeers +// } +// +// w.peers.Remove(peerElement) +// return peerElement.Value.(peer.ID), nil +//} + +func (w *workerPool) addPeer(who peer.ID) error { + if _, ok := w.ignoredPeers[who]; ok { + return ErrPeerIgnored } - w.statuses[batchID].Failed[tID] = tr - - if tr.Completed { - logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove - w.resChan <- tr - return + if len(w.peers) < cap(w.peers) { + w.peers <- who } - - // retry task - w.wg.Add(1) - go func() { - defer w.wg.Done() - w.executeTask(tr.Task, batchResults) - }() + //for e := w.peers.Front(); e != nil; e = e.Next() { + // if e.Value.(peer.ID) == who { + // return nil + // } + //} + // + //w.peers.PushBack(who) + //logger.Tracef("peer added, total in the pool %d", w.peers.Len()) + return nil } -func (w *workerPool) batchCompleted(id BatchID, todo int) bool { - w.mtx.Lock() - defer w.mtx.Unlock() - - b, ok := w.statuses[id] - return !ok || b.Completed(todo) -} +//func (w *workerPool) removePeer(who peer.ID) { +// var toRemove *list.Element +// for e := w.peers.Front(); e != nil; e = e.Next() { +// if e.Value.(peer.ID) == who { +// toRemove = e +// break +// } +// } +// +// if toRemove != nil { +// w.peers.Remove(toRemove) +// } +//} + +//func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) { +// w.mtx.Lock() +// defer w.mtx.Unlock() +// +// tID := tr.Task.ID() +// +// if failedTr, ok := w.statuses[batchID].Failed[tID]; ok { +// tr.Retries = failedTr.Retries + 1 +// delete(w.statuses[batchID].Failed, tID) +// } +// +// tr.Completed = true +// w.statuses[batchID].Success[tID] = tr +// _ = w.addPeer(tr.Who) +// +// logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl +// w.resChan <- tr +//} +// +//func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResults chan TaskResult) { +// w.mtx.Lock() +// defer w.mtx.Unlock() +// +// if errors.Is(w.ctx.Err(), context.Canceled) { +// logger.Infof("handleFailedTask(): worker pool canceled, not handling failed task") // TODO: remove +// return +// } +// +// logger.Infof("handling failed task err: %v task: %s", tr.Error, tr.Task.ID()) // TODO: remove +// delayRetry := false +// tID := tr.Task.ID() +// +// // It is only considered a retry if the task was actually executed. +// if errors.Is(tr.Error, ErrNoPeers) { +// delayRetry = true +// } else { +// w.ignoredPeers[tr.Who] = struct{}{} +// +// if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { +// tr.Retries = oldTr.Retries + 1 +// } +// } +// +// tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries +// w.statuses[batchID].Failed[tID] = tr +// +// if tr.Completed { +// logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl +// w.resChan <- tr +// return +// } +// +// // retry task +// w.wg.Add(1) +// go func() { +// defer w.wg.Done() +// if delayRetry { +// logger.Infof("delaying retry of task %s", tr.Task.String()) // TODO: remove +// timer := time.NewTimer(w.retryDelay) +// select { +// case <-timer.C: +// case <-w.ctx.Done(): +// logger.Infof("in gorouting, worker pool canceled, not retrying task") // TODO: remove +// return +// } +// } +// w.executeTask(tr.Task, batchResults) +// }() +//} +// +//func (w *workerPool) batchCompleted(id BatchID, todo int) bool { +// w.mtx.Lock() +// defer w.mtx.Unlock() +// +// b, ok := w.statuses[id] +// return !ok || b.Completed(todo) +//} diff --git a/dot/sync/worker_pool_test.go b/dot/sync/worker_pool_test.go index dac3f13b99..6b2634f7d8 100644 --- a/dot/sync/worker_pool_test.go +++ b/dot/sync/worker_pool_test.go @@ -55,6 +55,18 @@ func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) { return tasks, peers } +func makePool(maxRetries ...int) WorkerPool { + mr := 0 + if len(maxRetries) > 0 { + mr = maxRetries[0] + } + + return NewWorkerPool(WorkerPoolConfig{ + MaxRetries: mr, + NoPeersRetryDelay: time.Millisecond * 10, + }) +} + func waitForCompletion(wp WorkerPool, numTasks int) { resultsReceived := 0 @@ -73,7 +85,7 @@ func TestWorkerPoolHappyPath(t *testing.T) { var setup = func() (WorkerPool, []Task) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -120,7 +132,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("accepts_batch_without_any_peers", func(t *testing.T) { tasks, _ := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() wp.SubmitBatch(tasks) @@ -129,7 +141,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) { tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() assert.NoError(t, wp.AddPeer(peers[0])) assert.NoError(t, wp.AddPeer(peers[1])) @@ -145,7 +157,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) { t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) { _, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) @@ -178,7 +190,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) { failTwice.err = taskErr failTwice.succeedAfter = 2 - wp = NewWorkerPool(WorkerPoolConfig{MaxRetries: maxRetries}) + wp = makePool(maxRetries) for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err) @@ -232,7 +244,7 @@ func TestWorkerPoolMultipleBatches(t *testing.T) { b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks) peers := append(b1Peers, b2Peers...) - wp := NewWorkerPool(WorkerPoolConfig{}) + wp := makePool() for _, who := range peers { err := wp.AddPeer(who) assert.NoError(t, err)