Skip to content

Commit

Permalink
Fix DRR scheduler deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed May 15, 2024
1 parent 44f0f3c commit 2b5f4fb
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 44 deletions.
2 changes: 2 additions & 0 deletions pkg/protocol/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func newBlocks(protocol *Protocol) *Blocks {
b.SendResponse(block.ModelBlock())
}
}).Unhook,
// TODO: do we need to send the block to peers when we skip it?
// It should have been accepted already anyway.
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if !chain.WarpSyncMode.Get() {
b.SendResponse(block.ModelBlock())
Expand Down
11 changes: 11 additions & 0 deletions pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ func (b *Block) IsAccepted() bool {
return b.accepted.Get()
}

// IsCommitted returns true if the Block was accepted and the max committable age has passed.
func (b *Block) IsCommitted() bool {
if !b.IsAccepted() {
return false
}

maxCommittableAgeDuration := time.Second * time.Duration(int64(b.ProtocolBlock().API.ProtocolParameters().MaxCommittableAge())*b.ProtocolBlock().API.TimeProvider().SlotDurationSeconds())

return time.Since(b.IssuingTime()) > maxCommittableAgeDuration
}

// SetAccepted sets the Block as accepted.
func (b *Block) SetAccepted() (wasUpdated bool) {
return !b.accepted.Set(true)
Expand Down
34 changes: 34 additions & 0 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,40 @@ func (b *BasicBuffer) Next() *IssuerQueue {
return nil
}

// ForEach applies a consumer function to each IssuerQueue in the BasicBuffer.
func (b *BasicBuffer) ForEach(consumer func(*IssuerQueue) bool) {
if b.ring == nil {
return
}

// Create a temporary slice to hold the IssuerQueues
var queues []*IssuerQueue

// Start at the current ring position
start := b.ring

for {
if issuerQueue, isIQ := b.ring.Value.(*IssuerQueue); isIQ {
queues = append(queues, issuerQueue)
}

// Move to the next position in the ring
b.ring = b.ring.Next()

// If we've looped back to the start, break out of the loop
if b.ring == start {
break
}
}

// Apply the consumer function to each IssuerQueue
for _, queue := range queues {
if !consumer(queue) {
return
}
}
}

// Current returns the current IssuerQueue in round-robin order.
func (b *BasicBuffer) Current() *IssuerQueue {
if b.ring == nil {
Expand Down
77 changes: 33 additions & 44 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,8 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
if len(s.basicBuffer.blockChan) > 0 {
return
}
start := s.basicBuffer.Current()
// no blocks submitted
if start == nil {
return
}

rounds, schedulingIssuer := s.selectIssuer(start, slot)
rounds, schedulingIssuer := s.selectIssuer(slot)

// if there is no issuer with a ready block, we cannot schedule anything
if schedulingIssuer == nil {
Expand All @@ -417,26 +412,20 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {

if rounds > 0 {
// increment every issuer's deficit for the required number of rounds
for q := start; ; {
issuerID := q.IssuerID()
s.basicBuffer.ForEach(func(queue *IssuerQueue) bool {
issuerID := queue.IssuerID()

if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

q = s.basicBuffer.Current()
} else {
q = s.basicBuffer.Next()
}
if q == nil {
return
}
if q == start {
break
}
}

return true
})
}

// increment the deficit for all issuers before schedulingIssuer one more time
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
Expand All @@ -462,29 +451,34 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {

return
}

// schedule the block
s.basicBuffer.blockChan <- block
}

func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Deficit, *IssuerQueue) {
rounds := Deficit(math.MaxInt64)
func (s *Scheduler) selectIssuer(slot iotago.SlotIndex) (Deficit, *IssuerQueue) {
minRounds := Deficit(math.MaxInt64)
var schedulingIssuer *IssuerQueue

for q := start; ; {
block := q.Front()
var issuerRemoved bool
s.basicBuffer.ForEach(func(queue *IssuerQueue) bool {
block := queue.Front()

for block != nil && time.Now().After(block.IssuingTime()) {
currentAPI := s.apiProvider.CommittedAPI()
if block.IsAccepted() && time.Since(block.IssuingTime()) > time.Duration(currentAPI.TimeProvider().SlotDurationSeconds()*int64(currentAPI.ProtocolParameters().MaxCommittableAge())) {
// if the block is already committed, we can skip it.
if block.IsCommitted() {
if block.SetSkipped() {
// the block was accepted and therefore committed already, so we can mark the children as ready.
s.updateChildrenWithoutLocking(block)
s.events.BlockSkipped.Trigger(block)
}

s.basicBuffer.PopFront()
// remove the skipped block from the queue
_ = queue.PopFront()

block = q.Front()
// take the next block in the queue
block = queue.Front()

// continue to check the next block in the queue
continue
}

Expand All @@ -497,13 +491,14 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def
}

remainingDeficit := s.deficitFromWork(block.WorkScore()) - deficit

// calculate how many rounds we need to skip to accumulate enough deficit.
quantum, err := s.quantumFunc(issuerID, slot)
if err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to retrieve quantum for issuerID %s in slot %d during issuer selection", issuerID, slot))

// if quantum, can't be retrieved, we need to remove this issuer.
s.removeIssuer(issuerID, err)
issuerRemoved = true

break
}
Expand All @@ -513,31 +508,25 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def
numerator = math.MaxInt64
}

r, err := safemath.SafeDiv(numerator, quantum)
rounds, err := safemath.SafeDiv(numerator, quantum)
if err != nil {
panic(err)
}

// find the first issuer that will be allowed to schedule a block
if r < rounds {
rounds = r
schedulingIssuer = q
if rounds < minRounds {
minRounds = rounds
schedulingIssuer = queue
}

break
}

if issuerRemoved {
q = s.basicBuffer.Current()
} else {
q = s.basicBuffer.Next()
}
if q == start || q == nil {
break
}
}
// we need to go through all issuers once
return true
})

return rounds, schedulingIssuer
return minRounds, schedulingIssuer
}

func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocol/engine/tipmanager/v1/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func NewProvider() module.Provider[*engine.Engine, tipmanager.TipManager] {
tipWorker := e.Workers.CreatePool("AddTip", workerpool.WithWorkerCount(2))

e.Events.Scheduler.BlockScheduled.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker))

// the tipmanager needs to know about all the blocks that passed the scheduler
e.Events.Scheduler.BlockSkipped.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker))
e.Events.Evict.Hook(t.Evict)

Expand Down

0 comments on commit 2b5f4fb

Please sign in to comment.