diff --git a/dot/sync/configuration.go b/dot/sync/configuration.go index 646c77ce87..b215274c16 100644 --- a/dot/sync/configuration.go +++ b/dot/sync/configuration.go @@ -3,7 +3,9 @@ package sync -import "time" +import ( + "time" +) type ServiceConfig func(svc *SyncService) @@ -11,13 +13,23 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig { return func(svc *SyncService) { svc.currentStrategy = currentStrategy svc.defaultStrategy = defaultStrategy + + wpCapacity := currentStrategy.NumOfTasks() + if defaultStrategy != nil { + wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks()) + } + wpCapacity *= 2 // add some buffer + + svc.workerPool = NewWorkerPool(WorkerPoolConfig{ + MaxRetries: maxTaskRetries, + Capacity: wpCapacity, + }) } } func WithNetwork(net Network) ServiceConfig { return func(svc *SyncService) { svc.network = net - //svc.workerPool = newSyncWorkerPool(net) } } diff --git a/dot/sync/fullsync.go b/dot/sync/fullsync.go index 8a004abc56..317c649cf9 100644 --- a/dot/sync/fullsync.go +++ b/dot/sync/fullsync.go @@ -428,6 +428,10 @@ func (f *FullSyncStrategy) IsSynced() bool { return uint32(highestBlock)+messages.MaxBlocksInResponse >= f.peers.getTarget() //nolint:gosec } +func (f *FullSyncStrategy) NumOfTasks() int { + return f.numOfTasks +} + type RequestResponseData struct { req *messages.BlockRequestMessage responseData []*types.BlockData diff --git a/dot/sync/service.go b/dot/sync/service.go index 859e6bf0dc..57814e6025 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -93,6 +93,7 @@ type Strategy interface { Process(results <-chan TaskResult) (done bool, repChanges []Change, blocks []peer.ID, err error) ShowMetrics() IsSynced() bool + NumOfTasks() int } type SyncService struct { @@ -120,11 +121,7 @@ func NewSyncService(cfgs ...ServiceConfig) *SyncService { waitPeersDuration: waitPeersDefaultTimeout, stopCh: make(chan struct{}), seenBlockSyncRequests: lrucache.NewLRUCache[common.Hash, uint](100), - workerPool: NewWorkerPool(WorkerPoolConfig{ - MaxRetries: maxTaskRetries, - // TODO: This should depend on the actual configuration of the currently used sync strategy. - Capacity: defaultNumOfTasks * 10, - }), + workerPool: nil, } for _, cfg := range cfgs { diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index f91160c411..c406f054d4 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -35,7 +35,7 @@ type TaskResult struct { Completed bool Result Result Error error - Retries uint + Retries int Who peer.ID } @@ -82,15 +82,15 @@ type WorkerPool interface { } type WorkerPoolConfig struct { - Capacity uint - MaxRetries uint + Capacity int + MaxRetries int } // NewWorkerPool creates a new worker pool with the given configuration. func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool { ctx, cancel := context.WithCancel(context.Background()) - if cfg.Capacity == 0 { + if cfg.Capacity <= 0 { cfg.Capacity = defaultWorkerPoolCapacity } @@ -108,7 +108,7 @@ type workerPool struct { mtx sync.RWMutex wg sync.WaitGroup - maxRetries uint + maxRetries int peers list.List ignoredPeers map[peer.ID]struct{} statuses map[BatchID]BatchStatus diff --git a/dot/sync/worker_pool_test.go b/dot/sync/worker_pool_test.go index 676b787e78..bea4eddc76 100644 --- a/dot/sync/worker_pool_test.go +++ b/dot/sync/worker_pool_test.go @@ -94,7 +94,7 @@ func TestWorkerPoolHappyPath(t *testing.T) { result := <-wp.Results() assert.True(t, result.Completed) assert.False(t, result.Failed()) - assert.Equal(t, uint(0), result.Retries) + assert.Equal(t, 0, result.Retries) results = append(results, result) if len(results) == numTasks { @@ -172,7 +172,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) { numTasks := 3 taskErr := errors.New("kaput") - setup := func(maxRetries uint) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) { + setup := func(maxRetries int) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) { tasks, peers := makeTasksAndPeers(numTasks, 0) failOnce = tasks[1].(*mockTask) @@ -206,10 +206,10 @@ func TestWorkerPoolTaskFailures(t *testing.T) { assert.Equal(t, 0, len(status.Failed)) assert.Nil(t, status.Failed[failOnce.ID()].Error) - assert.Equal(t, uint(1), status.Success[failOnce.ID()].Retries) + assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) assert.Nil(t, status.Failed[failTwice.ID()].Error) - assert.Equal(t, uint(2), status.Success[failTwice.ID()].Retries) + assert.Equal(t, 2, status.Success[failTwice.ID()].Retries) }) t.Run("honours_max_retries", func(t *testing.T) { @@ -223,10 +223,10 @@ func TestWorkerPoolTaskFailures(t *testing.T) { assert.Equal(t, 1, len(status.Failed)) assert.Nil(t, status.Failed[failOnce.ID()].Error) - assert.Equal(t, uint(1), status.Success[failOnce.ID()].Retries) + assert.Equal(t, 1, status.Success[failOnce.ID()].Retries) assert.ErrorIs(t, taskErr, status.Failed[failTwice.ID()].Error) - assert.Equal(t, uint(1), status.Failed[failTwice.ID()].Retries) + assert.Equal(t, 1, status.Failed[failTwice.ID()].Retries) }) }