-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcgroup.go
127 lines (116 loc) · 2.71 KB
/
concgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package concgroup
import (
"context"
"sort"
"sync"
"golang.org/x/sync/errgroup"
)
// Group is a collection of goroutines like errgroup.Group.
type Group struct {
eg *errgroup.Group
mu sync.Mutex
locks map[string]*sync.Mutex
initOnce sync.Once
}
// WithContext returns a new Group and an associated Context like errgroup.Group.
func WithContext(ctx context.Context) (*Group, context.Context) {
eg, ctx := errgroup.WithContext(ctx)
return &Group{eg: eg}, ctx
}
// Go calls the given function in a new goroutine like errgroup.Group with key.
func (g *Group) Go(key string, f func() error) {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
g.eg.Go(func() error {
mu.Lock()
defer mu.Unlock()
return f()
})
}
// GoMulti calls the given function in a new goroutine like errgroup.Group with multiple key locks.
func (g *Group) GoMulti(keys []string, f func() error) {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
sort.Strings(keys)
var mus []*sync.Mutex
for _, key := range keys {
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
mus = append(mus, mu)
}
g.eg.Go(func() error {
for _, mu := range mus {
mu.Lock()
defer mu.Unlock()
}
return f()
})
}
// TryGo calls the given function only when the number of active goroutines is currently below the configured limit like errgroup.Group with key.
func (g *Group) TryGo(key string, f func() error) bool {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
return g.eg.TryGo(func() error {
mu.Lock()
defer mu.Unlock()
return f()
})
}
// TryGoMulti calls the given function only when the number of active goroutines is currently below the configured limit like errgroup.Group with multiple key locks.
func (g *Group) TryGoMulti(keys []string, f func() error) bool {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
sort.Strings(keys)
var mus []*sync.Mutex
for _, key := range keys {
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
mus = append(mus, mu)
}
return g.eg.TryGo(func() error {
for _, mu := range mus {
mu.Lock()
defer mu.Unlock()
}
return f()
})
}
// SetLimit limits the number of active goroutines in this group to at most n like errgroup.Group.
func (g *Group) SetLimit(n int) {
g.init()
g.eg.SetLimit(n)
}
// Wait blocks until all function calls from the Go method have returned like errgroup.Group.
func (g *Group) Wait() error {
return g.eg.Wait()
}
func (g *Group) init() {
g.initOnce.Do(func() {
if g.eg == nil {
g.eg = &errgroup.Group{}
}
if g.locks == nil {
g.locks = map[string]*sync.Mutex{}
}
})
}