Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DRR scheduler deadlock #972

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/protocol/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func newBlocks(protocol *Protocol) *Blocks {
b.SendResponse(block.ModelBlock())
}
}).Unhook,
// TODO: do we need to send the block to peers when we skip it?
// It should have been accepted already anyway.
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if !chain.WarpSyncMode.Get() {
b.SendResponse(block.ModelBlock())
Expand Down
11 changes: 11 additions & 0 deletions pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ func (b *Block) IsAccepted() bool {
return b.accepted.Get()
}

// IsCommitted returns true if the Block was accepted and the max committable age has passed.
func (b *Block) IsCommitted() bool {
if !b.IsAccepted() {
return false
}

maxCommittableAgeDuration := time.Second * time.Duration(int64(b.ProtocolBlock().API.ProtocolParameters().MaxCommittableAge())*b.ProtocolBlock().API.TimeProvider().SlotDurationSeconds())

return time.Since(b.IssuingTime()) > maxCommittableAgeDuration
}

// SetAccepted sets the Block as accepted.
func (b *Block) SetAccepted() (wasUpdated bool) {
return !b.accepted.Set(true)
Expand Down
34 changes: 34 additions & 0 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,40 @@ func (b *BasicBuffer) Next() *IssuerQueue {
return nil
}

// ForEach applies a consumer function to each IssuerQueue in the BasicBuffer.
func (b *BasicBuffer) ForEach(consumer func(*IssuerQueue) bool) {
if b.ring == nil {
return
}

// Create a temporary slice to hold the IssuerQueues
var queues []*IssuerQueue

// Start at the current ring position
start := b.ring

for {
if issuerQueue, isIQ := b.ring.Value.(*IssuerQueue); isIQ {
queues = append(queues, issuerQueue)
}

// Move to the next position in the ring
b.ring = b.ring.Next()

// If we've looped back to the start, break out of the loop
if b.ring == start {
break
}
}

// Apply the consumer function to each IssuerQueue
for _, queue := range queues {
if !consumer(queue) {
return
}
}
}

// Current returns the current IssuerQueue in round-robin order.
func (b *BasicBuffer) Current() *IssuerQueue {
if b.ring == nil {
Expand Down
77 changes: 33 additions & 44 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,8 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
if len(s.basicBuffer.blockChan) > 0 {
return
}
start := s.basicBuffer.Current()
// no blocks submitted
if start == nil {
return
}

rounds, schedulingIssuer := s.selectIssuer(start, slot)
rounds, schedulingIssuer := s.selectIssuer(slot)

// if there is no issuer with a ready block, we cannot schedule anything
if schedulingIssuer == nil {
Expand All @@ -417,26 +412,20 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {

if rounds > 0 {
// increment every issuer's deficit for the required number of rounds
for q := start; ; {
issuerID := q.IssuerID()
s.basicBuffer.ForEach(func(queue *IssuerQueue) bool {
issuerID := queue.IssuerID()

if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

q = s.basicBuffer.Current()
} else {
q = s.basicBuffer.Next()
}
if q == nil {
return
}
if q == start {
break
}
}

return true
})
}

// increment the deficit for all issuers before schedulingIssuer one more time
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
Expand All @@ -462,29 +451,34 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {

return
}

// schedule the block
s.basicBuffer.blockChan <- block
}

func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Deficit, *IssuerQueue) {
rounds := Deficit(math.MaxInt64)
func (s *Scheduler) selectIssuer(slot iotago.SlotIndex) (Deficit, *IssuerQueue) {
minRounds := Deficit(math.MaxInt64)
var schedulingIssuer *IssuerQueue

for q := start; ; {
block := q.Front()
var issuerRemoved bool
s.basicBuffer.ForEach(func(queue *IssuerQueue) bool {
block := queue.Front()

for block != nil && time.Now().After(block.IssuingTime()) {
currentAPI := s.apiProvider.CommittedAPI()
if block.IsAccepted() && time.Since(block.IssuingTime()) > time.Duration(currentAPI.TimeProvider().SlotDurationSeconds()*int64(currentAPI.ProtocolParameters().MaxCommittableAge())) {
// if the block is already committed, we can skip it.
if block.IsCommitted() {
if block.SetSkipped() {
// the block was accepted and therefore committed already, so we can mark the children as ready.
s.updateChildrenWithoutLocking(block)
s.events.BlockSkipped.Trigger(block)
}

s.basicBuffer.PopFront()
// remove the skipped block from the queue
_ = queue.PopFront()

block = q.Front()
// take the next block in the queue
block = queue.Front()

// continue to check the next block in the queue
continue
}

Expand All @@ -497,13 +491,14 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def
}

remainingDeficit := s.deficitFromWork(block.WorkScore()) - deficit

// calculate how many rounds we need to skip to accumulate enough deficit.
quantum, err := s.quantumFunc(issuerID, slot)
if err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to retrieve quantum for issuerID %s in slot %d during issuer selection", issuerID, slot))

// if quantum, can't be retrieved, we need to remove this issuer.
s.removeIssuer(issuerID, err)
issuerRemoved = true

break
}
Expand All @@ -513,31 +508,25 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def
numerator = math.MaxInt64
}

r, err := safemath.SafeDiv(numerator, quantum)
rounds, err := safemath.SafeDiv(numerator, quantum)
if err != nil {
panic(err)
}

// find the first issuer that will be allowed to schedule a block
if r < rounds {
rounds = r
schedulingIssuer = q
if rounds < minRounds {
minRounds = rounds
schedulingIssuer = queue
}

break
}

if issuerRemoved {
q = s.basicBuffer.Current()
} else {
q = s.basicBuffer.Next()
}
if q == start || q == nil {
break
}
}
// we need to go through all issuers once
return true
})

return rounds, schedulingIssuer
return minRounds, schedulingIssuer
}

func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocol/engine/tipmanager/v1/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func NewProvider() module.Provider[*engine.Engine, tipmanager.TipManager] {
tipWorker := e.Workers.CreatePool("AddTip", workerpool.WithWorkerCount(2))

e.Events.Scheduler.BlockScheduled.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker))

// the tipmanager needs to know about all the blocks that passed the scheduler
e.Events.Scheduler.BlockSkipped.Hook(lo.Void(t.AddBlock), event.WithWorkerPool(tipWorker))
e.Events.Evict.Hook(t.Evict)

Expand Down
Loading