forked from trustmaster/goflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraph_connect.go
349 lines (317 loc) · 9.63 KB
/
graph_connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package goflow
import (
"fmt"
"reflect"
"strconv"
)
// address is a full port accessor including the index part
type address struct {
proc string
port string
key string
index int
}
func (a address) String() string {
if a.key != "" {
return fmt.Sprintf("%s.%s[%s]", a.proc, a.port, a.key)
}
return fmt.Sprintf("%s.%s", a.proc, a.port)
}
// connection stores information about a connection within the net.
type connection struct {
src address
tgt address
channel reflect.Value
buffer int
}
// Connect connects a sender to a receiver and creates a channel between them using BufferSize configuratio nof the graph.
// Normally such a connection is unbuffered but you can change by setting flow.DefaultBufferSize > 0 or
// by using ConnectBuf() function instead.
// It returns true on success or panics and returns false if error occurs.
func (n *Graph) Connect(senderName, senderPort, receiverName, receiverPort string) error {
return n.ConnectBuf(senderName, senderPort, receiverName, receiverPort, n.conf.BufferSize)
}
// ConnectBuf connects a sender to a receiver using a channel with a buffer of a given size.
// It returns true on success or panics and returns false if error occurs.
func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort string, bufferSize int) error {
sendAddr := parseAddress(senderName, senderPort)
sendPort, err := n.getProcPort(senderName, sendAddr.port, reflect.SendDir)
if err != nil {
return fmt.Errorf("connect: %w", err)
}
recvAddr := parseAddress(receiverName, receiverPort)
recvPort, err := n.getProcPort(receiverName, recvAddr.port, reflect.RecvDir)
if err != nil {
return fmt.Errorf("connect: %w", err)
}
isNewChan := false // tells if a new channel will need to be created for this connection
// Try to find an existing outbound channel from the same sender,
// so it can be used as fan-out FIFO
ch := n.findExistingChan(sendAddr, reflect.SendDir)
if !ch.IsValid() || ch.IsNil() {
// Then try to find an existing inbound channel to the same receiver,
// so it can be used as a fan-in FIFO
ch = n.findExistingChan(recvAddr, reflect.RecvDir)
if ch.IsValid() && !ch.IsNil() {
// Increase the number of listeners on this already used channel
n.incChanListenersCount(ch)
} else {
isNewChan = true
}
}
ch, err = attachPort(sendPort, sendAddr, reflect.SendDir, ch, bufferSize)
if err != nil {
return fmt.Errorf("connect '%s.%s': %w", senderName, senderPort, err)
}
if _, err = attachPort(recvPort, recvAddr, reflect.RecvDir, ch, bufferSize); err != nil {
return fmt.Errorf("connect '%s.%s': %w", receiverName, receiverPort, err)
}
if isNewChan {
// Register the first listener on a newly created channel
n.incChanListenersCount(ch)
}
// Add connection info
n.connections = append(n.connections, connection{
src: sendAddr,
tgt: recvAddr,
channel: ch,
buffer: bufferSize})
return nil
}
// getProcPort finds an assignable port field in one of the subprocesses
func (n *Graph) getProcPort(procName, portName string, dir reflect.ChanDir) (reflect.Value, error) {
nilValue := reflect.ValueOf(nil)
// Check if process exists
proc, ok := n.procs[procName]
if !ok {
return nilValue, fmt.Errorf("getProcPort: process '%s' not found", procName)
}
// Check if process is settable
val := reflect.ValueOf(proc)
if val.Kind() == reflect.Ptr && val.IsValid() {
val = val.Elem()
}
if !val.CanSet() {
return nilValue, fmt.Errorf("getProcPort: process '%s' is not settable", procName)
}
// Get the port value
var portVal reflect.Value
var err error
// Check if sender is a net
net, ok := val.Interface().(Graph)
if ok {
// Sender is a net
var ports map[string]port
if dir == reflect.SendDir {
ports = net.outPorts
} else {
ports = net.inPorts
}
port, ok := ports[portName]
if !ok {
return nilValue, fmt.Errorf("getProcPort: subgraph '%s' does not have inport '%s'", procName, portName)
}
portVal, err = net.getProcPort(port.addr.proc, port.addr.port, dir)
} else {
// Sender is a proc
portVal = val.FieldByName(portName)
}
if err == nil && (!portVal.IsValid()) {
err = fmt.Errorf("process '%s' does not have a valid port '%s'", procName, portName)
}
if err != nil {
return nilValue, fmt.Errorf("getProcPort: %w", err)
}
return portVal, nil
}
func attachPort(port reflect.Value, addr address, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) {
if addr.index > -1 {
return attachArrayPort(port, addr.index, dir, ch, bufSize)
}
if addr.key != "" {
return attachMapPort(port, addr.key, dir, ch, bufSize)
}
return attachChanPort(port, dir, ch, bufSize)
}
func attachChanPort(port reflect.Value, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) {
if err := validateChanDir(port.Type(), dir); err != nil {
return ch, err
}
if err := validateCanSet(port); err != nil {
return ch, err
}
ch = selectOrMakeChan(ch, port, port.Type().Elem(), bufSize)
port.Set(ch)
return ch, nil
}
func attachMapPort(port reflect.Value, key string, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) {
if err := validateChanDir(port.Type().Elem(), dir); err != nil {
return ch, err
}
kv := reflect.ValueOf(key)
item := port.MapIndex(kv)
ch = selectOrMakeChan(ch, item, port.Type().Elem().Elem(), bufSize)
if port.IsNil() {
m := reflect.MakeMap(port.Type())
port.Set(m)
}
port.SetMapIndex(kv, ch)
return ch, nil
}
func attachArrayPort(port reflect.Value, key int, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) {
if err := validateChanDir(port.Type().Elem(), dir); err != nil {
return ch, err
}
if port.IsNil() {
m := reflect.MakeSlice(port.Type(), 0, 32)
port.Set(m)
}
if port.Cap() <= key {
port.SetCap(2 * key)
}
if port.Len() <= key {
port.SetLen(key + 1)
}
item := port.Index(key)
ch = selectOrMakeChan(ch, item, port.Type().Elem().Elem(), bufSize)
item.Set(ch)
return ch, nil
}
func validateChanDir(portType reflect.Type, dir reflect.ChanDir) error {
if portType.Kind() != reflect.Chan {
return fmt.Errorf("not a channel")
}
if portType.ChanDir()&dir == 0 {
return fmt.Errorf("channel does not support direction %s", dir.String())
}
return nil
}
func validateCanSet(portVal reflect.Value) error {
if !portVal.CanSet() {
return fmt.Errorf("port is not assignable")
}
return nil
}
func selectOrMakeChan(new, existing reflect.Value, t reflect.Type, bufSize int) reflect.Value {
if !new.IsValid() || new.IsNil() {
if existing.IsValid() && !existing.IsNil() {
return existing
}
chanType := reflect.ChanOf(reflect.BothDir, t)
new = reflect.MakeChan(chanType, bufSize)
}
return new
}
// parseAddress unfolds a string port name into parts, including array index or hashmap key
func parseAddress(proc, port string) address {
n := address{
proc: proc,
port: port,
index: -1,
}
keyPos := 0
key := ""
for i, r := range port {
if r == '[' {
keyPos = i + 1
n.port = port[0:i]
}
if r == ']' {
key = port[keyPos:i]
}
}
if key == "" {
return n
}
if i, err := strconv.Atoi(key); err == nil {
n.index = i
} else {
n.key = key
}
n.key = key
return n
}
// findExistingChan returns a channel attached to receiver if it already exists among connections
func (n *Graph) findExistingChan(addr address, dir reflect.ChanDir) reflect.Value {
var channel reflect.Value
// Find existing channel attached to the receiver
for _, conn := range n.connections {
var a address
if dir == reflect.SendDir {
a = conn.src
} else {
a = conn.tgt
}
if a == addr {
channel = conn.channel
break
}
}
return channel
}
// incChanListenersCount increments SendChanRefCount.
// The count is needed when multiple senders are connected
// to the same receiver. When the network is terminated and
// senders need to close their output port, this counter
// can help to avoid closing the same channel multiple times.
func (n *Graph) incChanListenersCount(c reflect.Value) {
n.chanListenersCountLock.Lock()
defer n.chanListenersCountLock.Unlock()
ptr := c.Pointer()
cnt := n.chanListenersCount[ptr]
cnt++
n.chanListenersCount[ptr] = cnt
}
// decChanListenersCount decrements SendChanRefCount
// It returns true if the RefCount has reached 0
func (n *Graph) decChanListenersCount(c reflect.Value) bool {
n.chanListenersCountLock.Lock()
defer n.chanListenersCountLock.Unlock()
ptr := c.Pointer()
cnt := n.chanListenersCount[ptr]
if cnt == 0 {
return true //yes you may try to close a nonexistant channel, see what happens...
}
cnt--
n.chanListenersCount[ptr] = cnt
return cnt == 0
}
// // Disconnect removes a connection between sender's outport and receiver's inport.
// func (n *Graph) Disconnect(senderName, senderPort, receiverName, receiverPort string) bool {
// var sender, receiver interface{}
// var ok bool
// sender, ok = n.procs[senderName]
// if !ok {
// return false
// }
// receiver, ok = n.procs[receiverName]
// if !ok {
// return false
// }
// res := unsetProcPort(sender, senderPort, true)
// res = res && unsetProcPort(receiver, receiverPort, false)
// return res
// }
// // Unsets an port of a given process
// func unsetProcPort(proc interface{}, portName string, isOut bool) bool {
// v := reflect.ValueOf(proc)
// var ch reflect.Value
// if v.Elem().FieldByName("Graph").IsValid() {
// if subnet, ok := v.Elem().FieldByName("Graph").Addr().Interface().(*Graph); ok {
// if isOut {
// ch = subnet.getOutPort(portName)
// } else {
// ch = subnet.getInPort(portName)
// }
// } else {
// return false
// }
// } else {
// ch = v.Elem().FieldByName(portName)
// }
// if !ch.IsValid() {
// return false
// }
// ch.Set(reflect.Zero(ch.Type()))
// return true
// }