-
Notifications
You must be signed in to change notification settings - Fork 0
/
cortical.go
186 lines (175 loc) · 4.98 KB
/
cortical.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Package cortical is a utility that handles websocket connexions and dispatch
// all the []byte received to the consumers called "cortex".
// It also get all the informations of the producers and sends them back to the websocket
//
// The context passed to the Cortex holds a uuid encoded into a string that is specific for each connection
// The UUID can be retrieved by calling
//
// uuid := ctx.Value(ContextKeyType(ContextKey))
package cortical
import (
"context"
"encoding/json"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"net/http"
)
// A Cortex shall implement an interface that returns two functions for getting and sending a byte array
type Cortex interface {
NewCortex(context.Context) (GetInfoFromCortexFunc, SendInfoToCortex)
}
// Cortical specifies how to upgrade an HTTP connection to a Websocket connection
// as well as the action to be performed on receive a []byte
type Cortical struct {
Upgrader websocket.Upgrader
Cortexes []Cortex
}
// ContextKeyType is the type of the key of the context
type ContextKeyType string
// ContextKey is the key name where the session is stored
const ContextKey = "uuid"
// GetInfoFromCortexFunc is the method implenented by a chatter to send objects
type GetInfoFromCortexFunc func(ctx context.Context) chan []byte
// SendInfoToCortex is the method implemented by a chatter to receive objects
type SendInfoToCortex func(context.Context, *[]byte)
type httpErr struct {
Msg string `json:"msg"`
Code int `json:"code"`
}
func handleErr(w http.ResponseWriter, err error, status int) {
msg, err := json.Marshal(&httpErr{
Msg: err.Error(),
Code: status,
})
if err != nil {
msg = []byte(err.Error())
}
http.Error(w, string(msg), status)
}
// ServeWS is the dispacher function
func (wsd *Cortical) ServeWS(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), ContextKeyType(ContextKey), uuid.New().String())
conn, err := wsd.Upgrader.Upgrade(w, r, nil)
if err != nil {
handleErr(w, err, http.StatusInternalServerError)
return
}
defer conn.Close()
var senders = make([]GetInfoFromCortexFunc, 0)
var receivers = make([]SendInfoToCortex, 0)
for _, cortex := range wsd.Cortexes {
snd, rcv := cortex.NewCortex(ctx)
if snd != nil {
senders = append(senders, snd)
}
if rcv != nil {
receivers = append(receivers, rcv)
}
}
rcvsNum := len(receivers)
sndrsNum := len(senders)
var stop []chan struct{}
for i := 0; i < sndrsNum+rcvsNum; i++ {
s := make(chan struct{})
stop = append(stop, s)
}
rcv := make(chan []byte, 1)
sendersChan := make([]<-chan []byte, sndrsNum)
chans := fanOut(rcv, rcvsNum, 1)
for i := 0; i < sndrsNum; i++ {
sendersChan[i] = senders[i](ctx)
}
for i := range chans {
receive(ctx, chans[i], stop[i+sndrsNum], receivers[i])
}
done := make(chan struct{}, 1)
send := merge(done, sendersChan...)
closed := make(chan struct{}, 2)
go func() {
for {
p := <-send
err := conn.WriteMessage(websocket.TextMessage, p)
if err != nil {
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake,
websocket.CloseNoStatusReceived) {
closed <- struct{}{}
return
}
if err == websocket.ErrCloseSent {
closed <- struct{}{}
return
}
// Temporary failure, nevermind
continue
}
}
}()
go func() {
for {
_, p, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake,
websocket.CloseNoStatusReceived) {
closed <- struct{}{}
return
}
if err == websocket.ErrCloseSent {
closed <- struct{}{}
return
}
// Temporary failure, nevermind
continue
}
rcv <- p
}
}()
<-closed
done <- struct{}{}
for i := 0; i < sndrsNum+rcvsNum; i++ {
stop[i] <- struct{}{}
}
}
func receive(ctx context.Context, msg <-chan []byte, stop chan struct{}, f SendInfoToCortex) {
go func() {
for {
select {
case b := <-msg:
f(ctx, &b)
case <-stop:
return
}
}
}()
}