-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathsimplehub.go
168 lines (144 loc) · 4.17 KB
/
simplehub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Copyright 2016 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
package pubsub
import (
"sync"
"github.com/juju/clock"
)
var prePublishTestHook func()
// SimpleHubConfig is the argument struct for NewSimpleHub.
type SimpleHubConfig struct {
// Logger allows specifying a logging implementation for debug
// and trace level messages emitted from the hub.
Logger Logger
// Metrics allows the passing in of a metrics collector.
Metrics Metrics
// Clock defines a clock to help improve test coverage.
Clock clock.Clock
}
// SimpleHub provides the base functionality of dealing with subscribers,
// and the notification of subscribers of events.
type SimpleHub struct {
mutex sync.Mutex
subscribers []*subscriber
idx int
logger Logger
metrics Metrics
clock clock.Clock
}
// NewSimpleHub returns a new SimpleHub instance.
//
// A simple hub does not touch the data that is passed through to Publish.
// This data is passed through to each Subscriber. Note that all subscribers
// are notified in parallel, and that no modification should be done to the
// data or data races will occur.
func NewSimpleHub(config *SimpleHubConfig) *SimpleHub {
if config == nil {
config = new(SimpleHubConfig)
}
logger := config.Logger
if logger == nil {
logger = noOpLogger{}
}
metrics := config.Metrics
if metrics == nil {
metrics = noOpMetrics{}
}
time := config.Clock
if time == nil {
time = clock.WallClock
}
return &SimpleHub{
logger: logger,
metrics: metrics,
clock: time,
}
}
// Publish will notify all the subscribers that are interested by calling
// their handler function.
//
// The data is passed through to each Subscriber untouched. Note that all
// subscribers are notified in parallel, and that no modification should be
// done to the data or data races will occur.
//
// The return function when called blocks and waits for all callbacks to be
// completed.
func (h *SimpleHub) Publish(topic string, data interface{}) func() {
h.mutex.Lock()
defer h.mutex.Unlock()
if prePublishTestHook != nil {
prePublishTestHook()
}
var wait sync.WaitGroup
for _, s := range h.subscribers {
if s.topicMatcher(topic) {
wait.Add(1)
s.notify(handlerCallback{
topic: topic,
data: data,
doneFn: wait.Done,
})
}
}
h.metrics.Published(topic)
return wait.Wait
}
// Subscribe to a topic with a handler function. If the topic is the same
// as the published topic, the handler function is called with the
// published topic and the associated data.
//
// The return value is a function that will unsubscribe the caller from
// the hub, for this subscription.
func (h *SimpleHub) Subscribe(topic string, handler func(string, interface{})) func() {
return h.SubscribeMatch(equalTopic(topic), handler)
}
// SubscribeMatch takes a function that determins whether the topic matches,
// and a handler function. If the matcher matches the published topic, the
// handler function is called with the published topic and the associated
// data.
//
// The return value is a function that will unsubscribe the caller from
// the hub, for this subscription.
func (h *SimpleHub) SubscribeMatch(matcher func(string) bool, handler func(string, interface{})) func() {
if handler == nil || matcher == nil {
// It is safe but useless.
return func() {}
}
h.mutex.Lock()
defer h.mutex.Unlock()
sub := newSubscriber(h.idx, matcher, handler, h.logger, h.metrics, h.clock)
h.idx++
h.subscribers = append(h.subscribers, sub)
h.metrics.Subscribed()
return func() { h.unsubscribe(sub.id) }
}
func (h *SimpleHub) unsubscribe(id int) {
h.mutex.Lock()
defer h.mutex.Unlock()
for i, sub := range h.subscribers {
if sub.id == id {
sub.close()
h.subscribers = append(h.subscribers[0:i], h.subscribers[i+1:]...)
h.metrics.Unsubscribed()
return
}
}
}
// Wait takes the returning function from Publish and returns a channel. The
// channel is closed once the returning function is done.
func Wait(done func()) <-chan struct{} {
ch := make(chan struct{})
go func() {
done()
close(ch)
}()
return ch
}
type handlerCallback struct {
topic string
data interface{}
doneFn func()
}
func (h *handlerCallback) done() {
h.doneFn()
}