forked from berty/berty
-
Notifications
You must be signed in to change notification settings - Fork 3
/
store_message_queue.go
87 lines (72 loc) · 1.93 KB
/
store_message_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package bertyprotocol
import (
"container/heap"
"sync"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/crypto"
"berty.tech/berty/v2/go/pkg/protocoltypes"
"berty.tech/go-orbit-db/stores/operation"
)
// An Item is something we manage in a priority queue.
type messageItem struct {
op operation.Operation
env *protocoltypes.MessageEnvelope
headers *protocoltypes.MessageHeaders
ownPK crypto.PubKey
hash cid.Cid
}
func (m *messageItem) Counter() uint64 {
return m.headers.Counter
}
// A priorityMessageQueue implements heap.Interface and holds Items.
type priorityMessageQueue struct {
messages []*messageItem
muMessages sync.RWMutex
}
func newPriorityMessageQueue() *priorityMessageQueue {
queue := &priorityMessageQueue{
messages: []*messageItem{},
}
heap.Init(queue)
return queue
}
func (pq *priorityMessageQueue) Add(m *messageItem) {
pq.muMessages.Lock()
heap.Push(pq, m)
pq.muMessages.Unlock()
}
func (pq *priorityMessageQueue) Next() (item *messageItem) {
pq.muMessages.Lock()
if len(pq.messages) > 0 {
item = heap.Pop(pq).(*messageItem)
}
pq.muMessages.Unlock()
return
}
func (pq *priorityMessageQueue) Size() (l int) {
pq.muMessages.RLock()
l = pq.Len()
pq.muMessages.RUnlock()
return
}
func (pq *priorityMessageQueue) Len() (l int) {
l = len(pq.messages)
return
}
func (pq *priorityMessageQueue) Less(i, j int) bool {
// We want Pop to give us the lowest, not highest, priority so we use lower than here.
return pq.messages[i].Counter() < pq.messages[j].Counter()
}
func (pq *priorityMessageQueue) Swap(i, j int) {
pq.messages[i], pq.messages[j] = pq.messages[j], pq.messages[i]
}
func (pq *priorityMessageQueue) Push(x interface{}) {
pq.messages = append(pq.messages, x.(*messageItem))
}
func (pq *priorityMessageQueue) Pop() (item interface{}) {
if n := len(pq.messages); n > 0 {
item = pq.messages[n-1]
pq.messages, pq.messages[n-1] = pq.messages[:n-1], nil
}
return item
}