diff --git a/pkg/protocol/blocks.go b/pkg/protocol/blocks.go index 537875fc3..664811068 100644 --- a/pkg/protocol/blocks.go +++ b/pkg/protocol/blocks.go @@ -69,6 +69,8 @@ func newBlocks(protocol *Protocol) *Blocks { b.SendResponse(block.ModelBlock()) } }).Unhook, + // TODO: do we need to send the block to peers when we skip it? + // It should have been accepted already anyway. engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { if !chain.WarpSyncMode.Get() { b.SendResponse(block.ModelBlock()) diff --git a/pkg/protocol/engine/blocks/block.go b/pkg/protocol/engine/blocks/block.go index 50891b024..54c20ae0d 100644 --- a/pkg/protocol/engine/blocks/block.go +++ b/pkg/protocol/engine/blocks/block.go @@ -466,6 +466,17 @@ func (b *Block) IsAccepted() bool { return b.accepted.Get() } +// IsCommitted returns true if the Block was accepted and the max committable age has passed. +func (b *Block) IsCommitted() bool { + if !b.IsAccepted() { + return false + } + + maxCommittableAgeDuration := time.Second * time.Duration(int64(b.ProtocolBlock().API.ProtocolParameters().MaxCommittableAge())*b.ProtocolBlock().API.TimeProvider().SlotDurationSeconds()) + + return time.Since(b.IssuingTime()) > maxCommittableAgeDuration +} + // SetAccepted sets the Block as accepted. func (b *Block) SetAccepted() (wasUpdated bool) { return !b.accepted.Set(true) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go index f90a4458b..dac3020dc 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go @@ -212,6 +212,40 @@ func (b *BasicBuffer) Next() *IssuerQueue { return nil } +// ForEach applies a consumer function to each IssuerQueue in the BasicBuffer. +func (b *BasicBuffer) ForEach(consumer func(*IssuerQueue) bool) { + if b.ring == nil { + return + } + + // Create a temporary slice to hold the IssuerQueues + var queues []*IssuerQueue + + // Start at the current ring position + start := b.ring + + for { + if issuerQueue, isIQ := b.ring.Value.(*IssuerQueue); isIQ { + queues = append(queues, issuerQueue) + } + + // Move to the next position in the ring + b.ring = b.ring.Next() + + // If we've looped back to the start, break out of the loop + if b.ring == start { + break + } + } + + // Apply the consumer function to each IssuerQueue + for _, queue := range queues { + if !consumer(queue) { + return + } + } +} + // Current returns the current IssuerQueue in round-robin order. func (b *BasicBuffer) Current() *IssuerQueue { if b.ring == nil { diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index a8dd7c28c..8d802e4a1 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -402,13 +402,8 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { if len(s.basicBuffer.blockChan) > 0 { return } - start := s.basicBuffer.Current() - // no blocks submitted - if start == nil { - return - } - rounds, schedulingIssuer := s.selectIssuer(start, slot) + rounds, schedulingIssuer := s.selectIssuer(slot) // if there is no issuer with a ready block, we cannot schedule anything if schedulingIssuer == nil { @@ -417,26 +412,20 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { if rounds > 0 { // increment every issuer's deficit for the required number of rounds - for q := start; ; { - issuerID := q.IssuerID() + s.basicBuffer.ForEach(func(queue *IssuerQueue) bool { + issuerID := queue.IssuerID() + if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot)) s.removeIssuer(issuerID, err) - - q = s.basicBuffer.Current() - } else { - q = s.basicBuffer.Next() - } - if q == nil { - return } - if q == start { - break - } - } + + return true + }) } + // increment the deficit for all issuers before schedulingIssuer one more time - for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() { + for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() { issuerID := q.IssuerID() newDeficit, err := s.incrementDeficit(issuerID, 1, slot) if err != nil { @@ -462,29 +451,34 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { return } + + // schedule the block s.basicBuffer.blockChan <- block } -func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Deficit, *IssuerQueue) { - rounds := Deficit(math.MaxInt64) +func (s *Scheduler) selectIssuer(slot iotago.SlotIndex) (Deficit, *IssuerQueue) { + minRounds := Deficit(math.MaxInt64) var schedulingIssuer *IssuerQueue - for q := start; ; { - block := q.Front() - var issuerRemoved bool + s.basicBuffer.ForEach(func(queue *IssuerQueue) bool { + block := queue.Front() for block != nil && time.Now().After(block.IssuingTime()) { - currentAPI := s.apiProvider.CommittedAPI() - if block.IsAccepted() && time.Since(block.IssuingTime()) > time.Duration(currentAPI.TimeProvider().SlotDurationSeconds()*int64(currentAPI.ProtocolParameters().MaxCommittableAge())) { + // if the block is already committed, we can skip it. + if block.IsCommitted() { if block.SetSkipped() { + // the block was accepted and therefore committed already, so we can mark the children as ready. s.updateChildrenWithoutLocking(block) s.events.BlockSkipped.Trigger(block) } - s.basicBuffer.PopFront() + // remove the skipped block from the queue + _ = queue.PopFront() - block = q.Front() + // take the next block in the queue + block = queue.Front() + // continue to check the next block in the queue continue } @@ -497,13 +491,14 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def } remainingDeficit := s.deficitFromWork(block.WorkScore()) - deficit + // calculate how many rounds we need to skip to accumulate enough deficit. quantum, err := s.quantumFunc(issuerID, slot) if err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to retrieve quantum for issuerID %s in slot %d during issuer selection", issuerID, slot)) + // if quantum, can't be retrieved, we need to remove this issuer. s.removeIssuer(issuerID, err) - issuerRemoved = true break } @@ -513,31 +508,25 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def numerator = math.MaxInt64 } - r, err := safemath.SafeDiv(numerator, quantum) + rounds, err := safemath.SafeDiv(numerator, quantum) if err != nil { panic(err) } // find the first issuer that will be allowed to schedule a block - if r < rounds { - rounds = r - schedulingIssuer = q + if rounds < minRounds { + minRounds = rounds + schedulingIssuer = queue } break } - if issuerRemoved { - q = s.basicBuffer.Current() - } else { - q = s.basicBuffer.Next() - } - if q == start || q == nil { - break - } - } + // we need to go through all issuers once + return true + }) - return rounds, schedulingIssuer + return minRounds, schedulingIssuer } func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { diff --git a/pkg/protocol/engine/tipmanager/v1/provider.go b/pkg/protocol/engine/tipmanager/v1/provider.go index 797c54911..314ae29cc 100644 --- a/pkg/protocol/engine/tipmanager/v1/provider.go +++ b/pkg/protocol/engine/tipmanager/v1/provider.go @@ -20,6 +20,8 @@ func NewProvider() module.Provider[*engine.Engine, tipmanager.TipManager] { tipWorker := e.Workers.CreatePool("AddTip", workerpool.WithWorkerCount(2)) e.Events.Scheduler.BlockScheduled.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker)) + + // the tipmanager needs to know about all the blocks that passed the scheduler e.Events.Scheduler.BlockSkipped.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker)) e.Events.Evict.Hook(t.Evict)