-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathring.go
99 lines (78 loc) · 2.22 KB
/
ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package syncbuffer
import (
"sync"
)
// NewRingBuffer returns a ring buffer of the given size.
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
items: make([]interface{}, size),
}
}
// RingBuffer stores a fixed-size list of any type of items.
// If the buffer is full, the oldest item is overwritten.
type RingBuffer struct {
count int
items []interface{}
lock sync.RWMutex
}
// ReadFrom returns all items newer than the given cursor, and a new cursor,
// which successive reads should use to query the buffer to keep data contiguous.
// If the given cursor is out of bounds, ReadFrom will return a nil slice.
// If the item at cursor is no longer available, ReadFrom will return the
// current entire buffer.
func (r *RingBuffer) ReadFrom(cursor int) ([]interface{}, int) {
if cursor < 0 {
return nil, 0
}
r.lock.RLock()
defer r.lock.RUnlock()
// This is how we will signal to callers to back off.
if cursor >= r.count {
return nil, r.count
}
// If cursor is valid, but is older than the oldest item in the buffer,
// just return the entire buffer.
oldestCursor := r.OldestCursor()
if cursor < oldestCursor {
return r.list(oldestCursor), r.count
}
return r.list(cursor), r.count
}
// Add appends an item to the buffer.
// It may overwrite the oldest item in the buffer.
func (r *RingBuffer) Add(item interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
r.items[r.index(r.count)] = item
r.count++
}
// OldestCursor returns the cursor of the oldest item in the buffer.
func (r *RingBuffer) OldestCursor() int {
r.lock.RLock()
defer r.lock.RUnlock()
oldestCursor := r.count - len(r.items)
if oldestCursor < 0 {
oldestCursor = 0
}
return oldestCursor
}
// List returns all available items ahead of the cursor, from oldest to newest.
func (r *RingBuffer) list(cursor int) []interface{} {
size := r.count - cursor
// If the cursor is too far ahead, exit.
if size <= 0 {
return nil
}
items := make([]interface{}, 0, size)
r.lock.RLock()
defer r.lock.RUnlock()
for cursor < r.count {
items = append(items, r.items[r.index(cursor)])
cursor++
}
return items
}
// Index returns the buffer index represented by a cursor.
func (r *RingBuffer) index(cursor int) int {
return cursor % len(r.items)
}