Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
haikoschol committed Nov 7, 2024
1 parent dc9040b commit 63a4cbe
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 575 deletions.
11 changes: 0 additions & 11 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.currentStrategy = currentStrategy
svc.defaultStrategy = defaultStrategy

wpCapacity := currentStrategy.NumOfTasks()
if defaultStrategy != nil {
wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks())
}
wpCapacity *= 2 // add some buffer

svc.workerPool = NewWorkerPool(WorkerPoolConfig{
Capacity: wpCapacity,
MaxRetries: UnlimitedRetries,
})
}
}

Expand Down
12 changes: 7 additions & 5 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) (
}

// This is safe as long as we are the only goroutine reading from the channel.
for len(results) > 0 {
for result := range results {
readyBlocks := make([][]*types.BlockData, 0)
result := <-results
//logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove
//result := <-results
//logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove
repChange, ignorePeer, validResp := validateResult(result, f.badBlocks)

if repChange != nil {
Expand Down Expand Up @@ -440,9 +442,9 @@ type RequestResponseData struct {
func validateResult(result TaskResult, badBlocks []string) (repChange *Change,
blockPeer bool, validRes *RequestResponseData) {

if !result.Completed {
return
}
//if !result.Completed {
// return
//}

task, ok := result.Task.(*syncTask)
if !ok {
Expand Down
9 changes: 3 additions & 6 deletions dot/sync/fullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ func TestFullSyncProcess(t *testing.T) {
messages.BootstrapRequestData, messages.Ascending),
requestMaker: requestMaker,
},
Completed: true,
Result: fstTaskBlockResponse,
Result: fstTaskBlockResponse,
},
// there is gap from 11 -> 128
// second task
Expand All @@ -218,8 +217,7 @@ func TestFullSyncProcess(t *testing.T) {
messages.BootstrapRequestData, messages.Ascending),
requestMaker: requestMaker,
},
Completed: true,
Result: sndTaskBlockResponse,
Result: sndTaskBlockResponse,
},
}

Expand Down Expand Up @@ -292,8 +290,7 @@ func TestFullSyncProcess(t *testing.T) {
request: expectedAncestorRequest,
requestMaker: requestMaker,
},
Completed: true,
Result: ancestorSearchResponse,
Result: ancestorSearchResponse,
}

done, _, _, err = fs.Process(results)
Expand Down
41 changes: 17 additions & 24 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
const (
waitPeersDefaultTimeout = 10 * time.Second
minPeersDefault = 1
maxTaskRetries = 5
)

var (
Expand Down Expand Up @@ -93,7 +92,6 @@ type Strategy interface {
Process(results <-chan TaskResult) (done bool, repChanges []Change, blocks []peer.ID, err error)
ShowMetrics()
IsSynced() bool
NumOfTasks() int
}

type SyncService struct {
Expand Down Expand Up @@ -121,7 +119,7 @@ func NewSyncService(cfgs ...ServiceConfig) *SyncService {
waitPeersDuration: waitPeersDefaultTimeout,
stopCh: make(chan struct{}),
seenBlockSyncRequests: lrucache.NewLRUCache[common.Hash, uint](100),
workerPool: nil,
workerPool: NewWorkerPool(),
}

for _, cfg := range cfgs {
Expand All @@ -138,7 +136,7 @@ func (s *SyncService) waitWorkers() {
}

for {
total := s.workerPool.NumPeers()
total := s.workerPool.IdlePeers()
if total >= s.minPeers {
return
}
Expand Down Expand Up @@ -175,11 +173,7 @@ func (s *SyncService) Stop() error {

func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error {
logger.Infof("receiving a block announce handshake from %s", from.String())
logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove
if err := s.workerPool.AddPeer(from); err != nil {
logger.Warnf("failed to add peer to worker pool: %s", err)
return err
}
s.workerPool.AddPeer(from)

s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -203,9 +197,7 @@ func (s *SyncService) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnoun
return nil
}

func (s *SyncService) OnConnectionClosed(who peer.ID) {
logger.Tracef("removing peer worker: %s", who.String())
s.workerPool.RemovePeer(who)
func (s *SyncService) OnConnectionClosed(_ peer.ID) {
}

func (s *SyncService) IsSynced() bool {
Expand Down Expand Up @@ -268,26 +260,27 @@ func (s *SyncService) runStrategy() {
logger.Infof(
"🚣 currently syncing, %d peers connected, %d peers in the worker pool, finalized #%d (%s), best #%d (%s)",
len(s.network.AllConnectedPeersIDs()),
s.workerPool.NumPeers(),
s.workerPool.IdlePeers(),
finalisedHeader.Number,
finalisedHeader.Hash().Short(),
bestBlockHeader.Number,
bestBlockHeader.Hash().Short(),
)

if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() {
tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}
tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}

logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}
logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}

_ = s.workerPool.SubmitBatch(tasks)
if err := s.workerPool.SubmitBatch(tasks); err != nil {
logger.Debugf("unable to submit tasks to worker pool: %v", err)
return
}

done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results())
Expand Down
Loading

0 comments on commit 63a4cbe

Please sign in to comment.