Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler sizes and shutdown #947

Merged
merged 7 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 31 additions & 48 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package drr

import (
"container/ring"
"fmt"
"math"
"time"

Expand All @@ -20,8 +21,9 @@ import (
type BasicBuffer struct {
activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
size atomic.Int64

readyBlocksCount atomic.Int64
totalBlocksCount atomic.Int64

tokenBucket float64
lastScheduleTime time.Time
Expand Down Expand Up @@ -57,11 +59,6 @@ func (b *BasicBuffer) Clear() {
})
}

// Size returns the total number of blocks in BasicBuffer.
func (b *BasicBuffer) Size() int {
return int(b.size.Load())
}

// IssuerQueue returns the queue for the corresponding issuer.
func (b *BasicBuffer) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, exists := b.activeIssuers.Get(issuerID)
Expand Down Expand Up @@ -97,8 +94,25 @@ func (b *BasicBuffer) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
}

func (b *BasicBuffer) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))
element := b.activeIssuers.Compute(issuerID, func(_ *ring.Ring, exists bool) *ring.Ring {
if exists {
panic(fmt.Sprintf("issuer queue already exists: %s", issuerID.String()))
}

return b.ringInsert(NewIssuerQueue(issuerID, func(totalSizeDelta int64, readySizeDelta int64) {
if totalSizeDelta != 0 {
b.totalBlocksCount.Add(totalSizeDelta)
}
if readySizeDelta != 0 {
b.readyBlocksCount.Add(readySizeDelta)
}
}))
})

issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}

return issuerQueue
}
Expand Down Expand Up @@ -127,7 +141,7 @@ func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}
b.size.Sub(int64(issuerQueue.Size()))
issuerQueue.Clear()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
Expand Down Expand Up @@ -158,10 +172,8 @@ func (b *BasicBuffer) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, false
}

b.size.Inc()

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > maxBuffer {
if b.TotalBlocksCount() > maxBuffer {
return b.dropTail(quantumFunc, maxBuffer), true
}

Expand All @@ -178,40 +190,14 @@ func (b *BasicBuffer) Ready(block *blocks.Block) bool {
return issuerQueue.Ready(block)
}

// ReadyBlocksCount returns the number of ready blocks in the buffer.
func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
start := b.Current()
if start == nil {
return
}

for q := start; ; {
readyBlocksCount += q.readyHeap.Len()
q = b.Next()
if q == start {
break
}
}

return
}

// TotalBlocksCount returns the number of blocks in the buffer.
func (b *BasicBuffer) TotalBlocksCount() (blocksCount int) {
start := b.Current()
if start == nil {
return
}
for q := start; ; {
blocksCount += q.readyHeap.Len()
blocksCount += q.nonReadyMap.Size()
q = b.Next()
if q == start {
break
}
}
return int(b.totalBlocksCount.Load())
}

return
// ReadyBlocksCount returns the number of ready blocks in the buffer.
func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
return int(b.readyBlocksCount.Load())
}

// Next returns the next IssuerQueue in round-robin order.
Expand Down Expand Up @@ -250,8 +236,6 @@ func (b *BasicBuffer) PopFront() *blocks.Block {
return nil
}

b.size.Dec()

return block
}

Expand All @@ -275,7 +259,7 @@ func (b *BasicBuffer) IssuerIDs() []iotago.AccountID {

func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
for b.TotalBlocksCount() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.mustLongestQueueIssuerID(quantumFunc)
longestQueue := b.IssuerQueue(maxIssuerID)
Expand All @@ -288,7 +272,6 @@ func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu
panic("buffer is full, but tail of longest queue does not exist")
}

b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}

Expand Down
60 changes: 44 additions & 16 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@ import (

// IssuerQueue keeps the submitted blocks of an issuer.
type IssuerQueue struct {
issuerID iotago.AccountID
issuerID iotago.AccountID
muXxer marked this conversation as resolved.
Show resolved Hide resolved
sizeChangedFunc func(totalSizeDelta int64, readySizeDelta int64, workDelta int64)

nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64

size atomic.Int64
work atomic.Int64
}

// NewIssuerQueue returns a new IssuerQueue.
func NewIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return &IssuerQueue{
func NewIssuerQueue(issuerID iotago.AccountID, sizeChangedCallback func(totalSizeDelta int64, readySizeDelta int64)) *IssuerQueue {
queue := &IssuerQueue{
issuerID: issuerID,
nonReadyMap: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
}

queue.sizeChangedFunc = func(totalSizeDelta int64, readySizeDelta int64, workDelta int64) {
if totalSizeDelta != 0 {
queue.size.Add(totalSizeDelta)
}
if workDelta != 0 {
queue.work.Add(workDelta)
}

if sizeChangedCallback != nil {
sizeChangedCallback(totalSizeDelta, readySizeDelta)
}
}

return queue
}

// Size returns the total number of blocks in the queue.
Expand Down Expand Up @@ -70,21 +88,19 @@ func (q *IssuerQueue) Submit(element *blocks.Block) bool {
}

q.nonReadyMap.Set(element.ID(), element)
q.size.Inc()
q.work.Add(int64(element.WorkScore()))
q.sizeChangedFunc(1, 0, int64(element.WorkScore()))

return true
}

// Unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {
// unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) unsubmit(block *blocks.Block) bool {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.nonReadyMap.Delete(block.ID())
q.size.Dec()
q.work.Sub(int64(block.WorkScore()))
q.sizeChangedFunc(-1, 0, -int64(block.WorkScore()))

return true
}
Expand All @@ -98,6 +114,8 @@ func (q *IssuerQueue) Ready(block *blocks.Block) bool {
q.nonReadyMap.Delete(block.ID())
heap.Push(&q.readyHeap, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})

q.sizeChangedFunc(0, 1, 0)

return true
}

Expand All @@ -112,6 +130,18 @@ func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {
return ids
}

// Clear removes all blocks from the queue.
func (q *IssuerQueue) Clear() {
readyBlocksCount := int64(q.readyHeap.Len())

q.nonReadyMap.Clear()
for q.readyHeap.Len() > 0 {
_ = q.readyHeap.Pop()
}

q.sizeChangedFunc(-int64(q.Size()), -readyBlocksCount, -int64(q.Work()))
}

// Front returns the first ready block in the queue.
func (q *IssuerQueue) Front() *blocks.Block {
if q == nil || q.readyHeap.Len() == 0 {
Expand All @@ -132,8 +162,7 @@ func (q *IssuerQueue) PopFront() *blocks.Block {
panic("unable to pop from a non-empty heap.")
}
blk := heapElement.Value
q.size.Dec()
q.work.Sub(int64(blk.WorkScore()))
q.sizeChangedFunc(-1, -1, -int64(blk.WorkScore()))

return blk
}
Expand All @@ -152,7 +181,7 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
heapTailIndex := q.heapTail()
// if heap tail (oldest ready block) does not exist or is newer than oldest non-ready block, unsubmit the oldest non-ready block
if oldestNonReadyBlock != nil && (heapTailIndex < 0 || q.readyHeap[heapTailIndex].Key.CompareTo(timed.HeapKey(oldestNonReadyBlock.IssuingTime())) > 0) {
if q.Unsubmit(oldestNonReadyBlock) {
if q.unsubmit(oldestNonReadyBlock) {
return oldestNonReadyBlock
}
} else if heapTailIndex < 0 { // the heap is empty
Expand All @@ -166,8 +195,7 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
panic("trying to remove a heap element that does not exist.")
}
blk := heapElement.Value
q.size.Dec()
q.work.Sub(int64(blk.WorkScore()))
q.sizeChangedFunc(-1, -1, -int64(blk.WorkScore()))

return blk
}
Expand Down
Loading
Loading