Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
haikoschol committed Nov 7, 2024
1 parent dc9040b commit 6694cc3
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 274 deletions.
17 changes: 10 additions & 7 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,25 @@ import (
"time"
)

const defaultNoPeersRetryDelay = time.Second * 10

type ServiceConfig func(svc *SyncService)

func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.currentStrategy = currentStrategy
svc.defaultStrategy = defaultStrategy

wpCapacity := currentStrategy.NumOfTasks()
if defaultStrategy != nil {
wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks())
}
wpCapacity *= 2 // add some buffer
wpCapacity := 100 // TODO set to MaxPeers config // currentStrategy.NumOfTasks()
//if defaultStrategy != nil {
// wpCapacity = max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks())
//}
//wpCapacity *= 2 // add some buffer

svc.workerPool = NewWorkerPool(WorkerPoolConfig{
Capacity: wpCapacity,
MaxRetries: UnlimitedRetries,
Capacity: wpCapacity,
MaxRetries: UnlimitedRetries,
NoPeersRetryDelay: defaultNoPeersRetryDelay,
})
}
}
Expand Down
17 changes: 12 additions & 5 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,16 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) (
}

// This is safe as long as we are the only goroutine reading from the channel.
for len(results) > 0 {
for result := range results {
if result.Error != nil {
f.requestQueue.PushBack(result.Task.(*syncTask).request) // TODO: check cast
continue
}

readyBlocks := make([][]*types.BlockData, 0)
result := <-results
//logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove
//result := <-results
//logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove
repChange, ignorePeer, validResp := validateResult(result, f.badBlocks)

if repChange != nil {
Expand Down Expand Up @@ -440,9 +447,9 @@ type RequestResponseData struct {
func validateResult(result TaskResult, badBlocks []string) (repChange *Change,
blockPeer bool, validRes *RequestResponseData) {

if !result.Completed {
return
}
//if !result.Completed {
// return
//}

task, ok := result.Task.(*syncTask)
if !ok {
Expand Down
33 changes: 18 additions & 15 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
const (
waitPeersDefaultTimeout = 10 * time.Second
minPeersDefault = 1
maxTaskRetries = 5
)

var (
Expand Down Expand Up @@ -175,7 +174,6 @@ func (s *SyncService) Stop() error {

func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error {
logger.Infof("receiving a block announce handshake from %s", from.String())
logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove
if err := s.workerPool.AddPeer(from); err != nil {
logger.Warnf("failed to add peer to worker pool: %s", err)
return err
Expand Down Expand Up @@ -204,8 +202,8 @@ func (s *SyncService) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnoun
}

func (s *SyncService) OnConnectionClosed(who peer.ID) {
logger.Tracef("removing peer worker: %s", who.String())
s.workerPool.RemovePeer(who)
//logger.Tracef("removing peer worker: %s", who.String())
//s.workerPool.RemovePeer(who)
}

func (s *SyncService) IsSynced() bool {
Expand Down Expand Up @@ -275,20 +273,25 @@ func (s *SyncService) runStrategy() {
bestBlockHeader.Hash().Short(),
)

if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() {
tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}
//if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() {
tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}

logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}
logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}

_ = s.workerPool.SubmitBatch(tasks)
if err := s.workerPool.SubmitBatch(tasks); err != nil {
logger.Debugf("unable to submit tasks to worker pool: %v", err)
return
}
//} else {
// logger.Info("SyncService.runStrategy(): worker pool is at capacity, not asking for more tasks") // TODO: remove
//}

done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results())
if err != nil {
Expand Down
Loading

0 comments on commit 6694cc3

Please sign in to comment.