-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
topic_proxy.go
310 lines (279 loc) · 10.3 KB
/
topic_proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/******************************************************************************
* Description :
* Topic in a cluster which serves as a local representation of the master
* topic hosted at another node.
*****************************************************************************/
package main
import (
"net/http"
"time"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store/types"
)
func (t *Topic) runProxy(hub *Hub) {
killTimer := time.NewTimer(time.Hour)
killTimer.Stop()
for {
select {
case msg := <-t.reg:
// Request to add a connection to this topic
if t.isInactive() {
msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow()))
} else if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, msg, t.name, msg.sess); err != nil {
// Response (ctrl message) will be handled when it's received via the proxy channel.
logs.Warn.Printf("proxy topic[%s]: route join request from proxy to master failed - %s", t.name, err)
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
}
if msg.sess.inflightReqs != nil {
msg.sess.inflightReqs.Done()
}
case msg := <-t.unreg:
if !t.handleProxyLeaveRequest(msg, killTimer) {
sid := "nil"
if msg.sess != nil {
sid = msg.sess.sid
}
logs.Warn.Printf("proxy topic[%s]: failed to update proxy topic state for leave request - sid %s", t.name, sid)
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
}
if msg.init && msg.sess.inflightReqs != nil {
// If it's a client initiated request.
msg.sess.inflightReqs.Done()
}
case msg := <-t.clientMsg:
// Content message intended for broadcasting to recipients
if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil {
logs.Warn.Printf("topic proxy[%s]: route broadcast request from proxy to master failed - %s", t.name, err)
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
}
case msg := <-t.serverMsg:
if msg.Info != nil || msg.Pres != nil {
globals.cluster.routeToTopicIntraCluster(t.name, msg, msg.sess)
} else {
// FIXME: should something be done here?
logs.Err.Printf("ERROR!!! topic proxy[%s]: unexpected server-side message in proxy topic %s", t.name, msg.describe())
}
case msg := <-t.meta:
// Request to get/set topic metadata
if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, msg, t.name, msg.sess); err != nil {
logs.Warn.Printf("proxy topic[%s]: route meta request from proxy to master failed - %s", t.name, err)
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
}
case upd := <-t.supd:
// Either an update to 'me' user agent from one of the sessions or
// background session comes to foreground.
req := ProxyReqMeUserAgent
tmpSess := &Session{userAgent: upd.userAgent}
if upd.sess != nil {
// Subscribed user may not match session user. Find out who is subscribed
pssd, ok := t.sessions[upd.sess]
if !ok {
logs.Warn.Printf("proxy topic[%s]: sess update request from detached session - sid %s", t.name, upd.sess.sid)
continue
}
req = ProxyReqBgSession
tmpSess.uid = pssd.uid
tmpSess.sid = upd.sess.sid
tmpSess.userAgent = upd.sess.userAgent
}
if err := globals.cluster.routeToTopicMaster(req, nil, t.name, tmpSess); err != nil {
logs.Warn.Printf("proxy topic[%s]: route sess update request from proxy to master failed - %s", t.name, err)
}
case msg := <-t.proxy:
t.proxyMasterResponse(msg, killTimer)
case sd := <-t.exit:
// Tell sessions to remove the topic
for s := range t.sessions {
s.detachSession(t.name)
}
if err := globals.cluster.topicProxyGone(t.name); err != nil {
logs.Warn.Printf("proxy topic[%s] shutdown: failed to notify master - %s", t.name, err)
}
// Report completion back to sender, if 'done' is not nil.
if sd.done != nil {
sd.done <- true
}
return
case <-killTimer.C:
// Topic timeout
hub.unreg <- &topicUnreg{rcptTo: t.name}
}
}
}
// Takes a session leave request, forwards it to the topic master and
// modifies the local state accordingly.
// Returns whether the operation was successful.
func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.Timer) bool {
// Detach session from topic; session may continue to function.
var asUid types.Uid
if msg.init {
asUid = types.ParseUserId(msg.AsUser)
}
if asUid.IsZero() {
if pssd, ok := t.sessions[msg.sess]; ok {
asUid = pssd.uid
} else {
logs.Warn.Printf("proxy topic[%s]: leave request sent for unknown session", t.name)
return false
}
}
// Remove the session from the topic without waiting for a response from the master node
// because by the time the response arrives this session may be already gone from the session store
// and we won't be able to find and remove it by its sid.
pssd, result := t.remSession(msg.sess, asUid)
if result {
msg.sess.delSub(t.name)
}
if !msg.init {
// Explicitly specify the uid because the master multiplex session needs to know which
// of its multiple hosted sessions to delete.
msg.AsUser = asUid.UserId()
msg.Leave = &MsgClientLeave{}
msg.init = true
}
// Make sure we set the Original field if it's empty (e.g. when session is terminating altogether).
if msg.Original == "" {
if t.cat == types.TopicCatGrp && t.isChan {
// It's a channel topic. Original topic name depends the subscription type.
if result && pssd.isChanSub {
msg.Original = types.GrpToChn(t.xoriginal)
} else {
msg.Original = t.xoriginal
}
} else {
msg.Original = t.original(asUid)
}
}
if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, msg, t.name, msg.sess); err != nil {
logs.Warn.Printf("proxy topic[%s]: route leave request from proxy to master failed - %s", t.name, err)
}
if len(t.sessions) == 0 {
// No more sessions attached. Start the countdown.
killTimer.Reset(idleProxyTopicTimeout)
}
return result
}
// proxyMasterResponse at proxy topic processes a master topic response to an earlier request.
func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
// Kills topic after a period of inactivity.
keepAlive := idleProxyTopicTimeout
if msg.SrvMsg.Pres != nil && msg.SrvMsg.Pres.What == "acs" && msg.SrvMsg.Pres.Acs != nil {
// If the server changed acs on this topic, update the internal state.
t.updateAcsFromPresMsg(msg.SrvMsg.Pres)
}
if msg.OrigSid == "*" {
// It is a broadcast.
switch {
case msg.SrvMsg.Pres != nil || msg.SrvMsg.Data != nil || msg.SrvMsg.Info != nil:
// Regular broadcast.
t.handleProxyBroadcast(msg.SrvMsg)
case msg.SrvMsg.Ctrl != nil:
// Ctrl broadcast. E.g. for user eviction.
t.proxyCtrlBroadcast(msg.SrvMsg)
default:
}
} else {
sess := globals.sessionStore.Get(msg.OrigSid)
if sess == nil {
logs.Warn.Printf("proxy topic[%s]: session %s not found; already terminated?", t.name, msg.OrigSid)
}
switch msg.OrigReqType {
case ProxyReqJoin:
if sess != nil && msg.SrvMsg.Ctrl != nil {
// TODO: do we need to let the master topic know that the subscription is not longer valid
// or is it already informed by the session when it terminated?
// Subscription result.
if msg.SrvMsg.Ctrl.Code < 300 {
sess.sessionStoreLock.Lock()
// Make sure the session isn't gone yet.
if session := globals.sessionStore.Get(msg.OrigSid); session != nil {
// Successful subscriptions.
t.addSession(session, msg.SrvMsg.uid, types.IsChannel(msg.SrvMsg.Ctrl.Topic))
session.addSub(t.name, &Subscription{
broadcast: t.clientMsg,
done: t.unreg,
meta: t.meta,
supd: t.supd,
})
}
sess.sessionStoreLock.Unlock()
killTimer.Stop()
} else if len(t.sessions) == 0 {
killTimer.Reset(keepAlive)
}
}
case ProxyReqBroadcast, ProxyReqMeta, ProxyReqCall:
// no processing
case ProxyReqLeave:
if msg.SrvMsg != nil && msg.SrvMsg.Ctrl != nil {
if msg.SrvMsg.Ctrl.Code < 300 {
if sess != nil {
t.remSession(sess, sess.uid)
}
}
// All sessions are gone. Start the kill timer.
if len(t.sessions) == 0 {
killTimer.Reset(keepAlive)
}
}
default:
logs.Err.Printf("proxy topic[%s] received response referencing unexpected request type %d",
t.name, msg.OrigReqType)
}
if sess != nil && !sess.queueOut(msg.SrvMsg) {
logs.Err.Printf("proxy topic[%s]: timeout in sending response - sid %s", t.name, sess.sid)
}
}
}
// handleProxyBroadcast broadcasts a Data, Info or Pres message to sessions attached to this proxy topic.
func (t *Topic) handleProxyBroadcast(msg *ServerComMessage) {
if t.isInactive() {
// Ignore broadcast - topic is paused or being deleted.
return
}
if msg.Data != nil {
t.lastID = msg.Data.SeqId
}
t.broadcastToSessions(msg)
}
// proxyCtrlBroadcast broadcasts a ctrl command to certain sessions attached to this proxy topic.
func (t *Topic) proxyCtrlBroadcast(msg *ServerComMessage) {
if msg.Ctrl.Code == http.StatusResetContent && msg.Ctrl.Text == "evicted" {
// We received a ctrl command for evicting a user.
if msg.uid.IsZero() {
logs.Err.Panicf("proxy topic[%s]: proxy received evict message with empty uid", t.name)
}
for sess := range t.sessions {
// Proxy topic may only have ordinary sessions. No multiplexing or proxy sessions here.
if _, removed := t.remSession(sess, msg.uid); removed {
sess.detachSession(t.name)
if sess.sid != msg.SkipSid {
sess.queueOut(msg)
}
}
}
}
}
// updateAcsFromPresMsg modifies user acs in Topic's perUser struct based on the data in `pres`.
func (t *Topic) updateAcsFromPresMsg(pres *MsgServerPres) {
uid := types.ParseUserId(pres.Src)
if uid.IsZero() {
if t.cat != types.TopicCatMe {
logs.Warn.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src)
}
return
}
// If t.perUser[uid] does not exist, pud is initialized with blanks, otherwise it gets existing values.
pud := t.perUser[uid]
dacs := pres.Acs
if err := pud.modeWant.ApplyMutation(dacs.Want); err != nil {
logs.Warn.Printf("proxy topic[%s]: could not process acs change - want: %s", t.name, err)
return
}
if err := pud.modeGiven.ApplyMutation(dacs.Given); err != nil {
logs.Warn.Printf("proxy topic[%s]: could not process acs change - given: %s", t.name, err)
return
}
// Update existing or add new.
t.perUser[uid] = pud
}