-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconc_limiter.go
96 lines (79 loc) · 1.87 KB
/
conc_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package multilimiter
import (
"context"
"sync"
)
// Provides a thread-safe mechanism for acquiring and releasing slots
// Calling Stop() will unblock Acquire() if it's waiting on a slot
type ConcLimiter interface {
// Wait for a slot to become available
// only DeadlineExceeded and LimiterStopped errors can be returned
Acquire(ctx context.Context) (Slot, error)
// Put the slot back into the pool
// Release()
// Cancels processing outstanding acquisition requests
Cancel()
// The configured concurrency
Concurrency() int
// Wait for all Slots to be returned
Wait()
}
type Slot interface {
Release()
}
type slot struct {
once sync.Once
releaseFn func()
}
func (me *slot) Release() {
me.once.Do(me.releaseFn)
}
type BasicConcLimiter struct {
size int
slots, done chan struct{}
canceler *Canceler
wg sync.WaitGroup
}
var _ ConcLimiter = (*BasicConcLimiter)(nil)
// Creates a new concurrency limiter
// if size is <= 1, a default of 1 will be used
func NewConcLimiter(size int) *BasicConcLimiter {
if size <= 1 {
size = 1
}
slots := make(chan struct{}, size)
for i := 0; i < size; i++ {
slots <- struct{}{}
}
return &BasicConcLimiter{size: size, slots: slots, canceler: NewCanceler()}
}
func (me *BasicConcLimiter) Acquire(ctx context.Context) (Slot, error) {
if me.canceler.IsCanceled() {
return nil, LimiterStopped
}
// wait for a slot to become available
select {
case <-me.canceler.Done():
return nil, LimiterStopped
case <-ctx.Done():
return nil, DeadlineExceeded
case <-me.slots:
me.wg.Add(1)
return &slot{releaseFn: me.release}, nil
}
}
func (me *BasicConcLimiter) Cancel() {
me.canceler.Cancel()
}
func (me *BasicConcLimiter) release() {
if me.size >= 1 {
me.wg.Done()
me.slots <- struct{}{}
}
}
func (me *BasicConcLimiter) Concurrency() int {
return me.size
}
func (me *BasicConcLimiter) Wait() {
me.wg.Wait()
}