forked from zeromq/goczmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channeler.go
365 lines (321 loc) · 8.89 KB
/
channeler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package goczmq
/*
#include "czmq.h"
void Sock_init() {zsys_init();}
*/
import "C"
import (
"fmt"
"strings"
)
// Channeler serializes all access to a socket through a send
// and receive channel. It starts two threads, one is used for receiving
// from the zeromq socket. The other is used to listen to the receive
// channel, and send everything back to the socket thread for sending
// using an additional inproc socket.
type Channeler struct {
id string
sockType int
endpoints string
subscribe *string
commandAddr string
proxyAddr string
commandChan chan<- string
SendChan chan<- [][]byte
RecvChan <-chan [][]byte
ErrChan <-chan error
errChan chan<- error
destroyed bool
}
// Destroy sends a message to the Channeler to shut it down
// and clean it up.
func (c *Channeler) Destroy() {
if c.destroyed {
return
}
c.commandChan <- "destroy"
}
// Subscribe to a Topic
func (c *Channeler) Subscribe(topic string) {
if c.destroyed {
return
}
c.commandChan <- fmt.Sprintf("subscribe %s", topic)
}
// Unsubscribe from a Topic
func (c *Channeler) Unsubscribe(topic string) {
if c.destroyed {
return
}
c.commandChan <- fmt.Sprintf("unsubscribe %s", topic)
}
// actor is a routine that handles communication with
// the zeromq socket.
func (c *Channeler) actor(recvChan chan<- [][]byte, options []SockOption) {
pipe, err := NewPair(fmt.Sprintf(">%s", c.commandAddr))
if err != nil {
c.errChan <- err
return
}
defer pipe.Destroy()
defer close(recvChan)
pull, err := NewPull(c.proxyAddr)
if err != nil {
c.errChan <- err
return
}
defer pull.Destroy()
sock := NewSock(c.sockType, options...)
defer sock.Destroy()
switch c.sockType {
case Pub, Rep, Pull, Router, XPub:
err = sock.Attach(c.endpoints, true)
if err != nil {
c.errChan <- err
return
}
case Req, Push, Dealer, Pair, Stream, XSub:
err = sock.Attach(c.endpoints, false)
if err != nil {
c.errChan <- err
return
}
case Sub:
if c.subscribe != nil {
subscriptions := strings.Split(*c.subscribe, ",")
for _, topic := range subscriptions {
sock.SetOption(SockSetSubscribe(topic))
}
}
err = sock.Attach(c.endpoints, false)
if err != nil {
c.errChan <- err
return
}
default:
c.errChan <- ErrInvalidSockType
return
}
poller, err := NewPoller(sock, pull, pipe)
if err != nil {
c.errChan <- err
goto ExitActor
}
defer poller.Destroy()
for {
s, err := poller.Wait(-1)
if err != nil {
c.errChan <- err
goto ExitActor
}
switch s {
case pipe:
cmd, err := pipe.RecvMessage()
if err != nil {
c.errChan <- err
goto ExitActor
}
switch string(cmd[0]) {
case "destroy":
disconnect := strings.Split(c.endpoints, ",")
for _, endpoint := range disconnect {
sock.Disconnect(endpoint)
}
pipe.SendMessage([][]byte{[]byte("ok")})
goto ExitActor
case "subscribe":
topic := string(cmd[1])
sock.SetOption(SockSetSubscribe(topic))
pipe.SendMessage([][]byte{[]byte("ok")})
case "unsubscribe":
topic := string(cmd[1])
sock.SetOption(SockSetUnsubscribe(topic))
pipe.SendMessage([][]byte{[]byte("ok")})
}
case sock:
msg, err := s.RecvMessage()
if err != nil {
c.errChan <- err
goto ExitActor
}
recvChan <- msg
case pull:
msg, err := pull.RecvMessage()
if err != nil {
c.errChan <- err
goto ExitActor
}
err = sock.SendMessage(msg)
if err != nil {
c.errChan <- err
goto ExitActor
}
}
}
ExitActor:
}
// channeler is a routine that handles the channel select loop
// and sends commands to the zeromq socket.
func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byte) {
push, err := NewPush(c.proxyAddr)
if err != nil {
c.errChan <- err
return
}
defer push.Destroy()
pipe, err := NewPair(fmt.Sprintf("@%s", c.commandAddr))
if err != nil {
c.errChan <- err
goto ExitChanneler
}
defer pipe.Destroy()
for {
select {
case cmd := <-commandChan:
switch cmd {
case "destroy":
c.destroyed = true
err = pipe.SendFrame([]byte("destroy"), FlagNone)
if err != nil {
c.errChan <- err
} else {
_, err = pipe.RecvMessage()
if err != nil {
c.errChan <- err
}
}
goto ExitChanneler
default:
parts := strings.Split(cmd, " ")
numParts := len(parts)
message := make([][]byte, numParts, numParts)
for i, p := range parts {
message[i] = []byte(p)
}
err := pipe.SendMessage(message)
if err != nil {
c.errChan <- err
}
_, err = pipe.RecvMessage()
if err != nil {
c.errChan <- err
}
}
case msg := <-sendChan:
err := push.SendMessage(msg)
if err != nil {
c.errChan <- err
}
}
}
ExitChanneler:
}
// newChanneler accepts arguments from the socket type based
// constructors and creates a new Channeler instance
func newChanneler(sockType int, endpoints string, subscribe []string, options []SockOption) *Channeler {
commandChan := make(chan string)
sendChan := make(chan [][]byte)
recvChan := make(chan [][]byte)
errChan := make(chan error)
C.Sock_init()
c := &Channeler{
id: C.GoString(C.zuuid_str(C.zuuid_new())),
endpoints: endpoints,
sockType: sockType,
commandChan: commandChan,
SendChan: sendChan,
RecvChan: recvChan,
ErrChan: errChan,
errChan: errChan,
}
c.commandAddr = fmt.Sprintf("inproc://actorcontrol_%s", c.id)
c.proxyAddr = fmt.Sprintf("inproc://proxy_%s", c.id)
if len(subscribe) > 0 {
topics := strings.Join(subscribe, ",")
c.subscribe = &topics
}
go c.channeler(commandChan, sendChan)
go c.actor(recvChan, options)
return c
}
// NewPubChanneler creats a new Channeler wrapping
// a Pub socket. The socket will bind by default.
func NewPubChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Pub, endpoints, nil, options)
}
// NewSubChanneler creates a new Channeler wrapping
// a Sub socket. Along with an endpoint list
// it accepts a list of topics and/or socket options
// (discriminated by type). The socket will connect
// by default.
func NewSubChanneler(endpoints string, varargs ...interface{}) *Channeler {
subscribe := []string{}
options := []SockOption{}
var err error
for _, arg := range varargs {
switch x := arg.(type) {
case string:
subscribe = append(subscribe, x)
case SockOption:
options = append(options, x)
default:
err = fmt.Errorf("Don't know how to handle a %T argument to NewSubChanneler", arg)
break
}
}
channeler := newChanneler(Sub, endpoints, subscribe, options)
if err != nil {
go func() { channeler.errChan <- err }()
}
return channeler
}
// NewRepChanneler creates a new Channeler wrapping
// a Rep socket. The socket will bind by default.
func NewRepChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Rep, endpoints, nil, options)
}
// NewReqChanneler creates a new Channeler wrapping
// a Req socket. The socket will connect by default.
func NewReqChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Req, endpoints, nil, options)
}
// NewPullChanneler creates a new Channeler wrapping
// a Pull socket. The socket will bind by default.
func NewPullChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Pull, endpoints, nil, options)
}
// NewPushChanneler creates a new Channeler wrapping
// a Push socket. The socket will connect by default.
func NewPushChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Push, endpoints, nil, options)
}
// NewRouterChanneler creates a new Channeler wrapping
// a Router socket. The socket will Bind by default.
func NewRouterChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Router, endpoints, nil, options)
}
// NewDealerChanneler creates a new Channeler wrapping
// a Dealer socket. The socket will connect by default.
func NewDealerChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Dealer, endpoints, nil, options)
}
// NewXPubChanneler creates a new Channeler wrapping
// an XPub socket. The socket will Bind by default.
func NewXPubChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(XPub, endpoints, nil, options)
}
// NewXSubChanneler creates a new Channeler wrapping
// a XSub socket. The socket will connect by default.
func NewXSubChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(XSub, endpoints, nil, options)
}
// NewPairChanneler creates a new Channeler wrapping
// a Pair socket. The socket will connect by default.
func NewPairChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Pair, endpoints, nil, options)
}
// NewStreamChanneler creates a new Channeler wrapping
// a Pair socket. The socket will connect by default.
func NewStreamChanneler(endpoints string, options ...SockOption) *Channeler {
return newChanneler(Stream, endpoints, nil, options)
}