-
-
Notifications
You must be signed in to change notification settings - Fork 94
/
events.go
490 lines (422 loc) · 20.2 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
package centrifuge
import (
"context"
"time"
"github.com/centrifugal/protocol"
)
// ConnectEvent contains fields related to connecting event (when a server
// received Connect protocol command from client).
type ConnectEvent struct {
// ClientID that was generated by library for client connection.
ClientID string
// Token received from client as part of Connect Command.
Token string
// Data received from client as part of Connect Command.
Data []byte
// Name can contain client name if provided on connect.
Name string
// Version can contain client version if provided on connect.
Version string
// Transport contains information about transport used by client.
Transport TransportInfo
// Channels is a list of channels a client wants to subscribe to
// (server-side). It's just a way for a client to provide this list.
// Server should use ConnectReply.Subscriptions to tell Centrifuge
// the final list of server-side subscriptions for a connection which
// can differ from the Channels list.
Channels []string
}
// ConnectReply contains reaction to ConnectEvent.
type ConnectReply struct {
// Context allows returning a modified context.
Context context.Context
// Credentials should be set if app wants to authenticate connection.
// This field is optional since auth Credentials could be set through
// HTTP middleware.
Credentials *Credentials
// Data allows setting custom data in connect reply.
Data []byte
// Subscriptions map contains channels to subscribe connection to on server-side.
Subscriptions map[string]SubscribeOptions
// ClientSideRefresh tells library to use client-side refresh logic:
// i.e. send refresh commands with new connection token. If not set
// then server-side refresh mechanism will be used.
ClientSideRefresh bool
// Storage can be used to fill initial connection storage during connecting.
// This data may be then accessed/modified/replaced later during Client's lifetime
// over Client.AcquireStorage() call. This API is EXPERIMENTAL.
Storage map[string]any
// MaxMessagesInFrame is the maximum number of messages (replies and pushes) which
// Centrifuge Client message writer will collect from the client's queue before sending
// to the connection. By default, it's 16. Use -1 to disable the limit.
MaxMessagesInFrame int
// WriteDelay is a time Centrifuge will try to collect messages inside message writer loop
// before sending them towards this connection. Enabling WriteDelay may reduce CPU usage of
// both server and client in case of high message rate inside individual connections. The
// reduction happens due to the lesser number of system calls to execute. Enabling WriteDelay
// limits the maximum throughput of messages towards the connection which may be achieved.
// For example, if WriteDelay is 100ms then the max throughput per second will be
// (1000 / 100) * MaxMessagesInFrame (16 by default), i.e. 160 messages per second. This
// should be more than enough for target Centrifuge use cases (frontend apps) though.
WriteDelay time.Duration
// ReplyWithoutQueue when enabled will force Centrifuge to avoid using Client write
// queue for sending replies to commands for this connection. Replies sent directly to
// the Client's transport thus avoiding possible delays caused by writer loop, but replies
// lose a chance to be batched.
ReplyWithoutQueue bool
// QueueInitialCap set an initial capacity for client's message queue, the size of queue
// can grow further, but won't be reduced below QueueInitialCap. By default, it's 2.
QueueInitialCap int
// PingPongConfig if set, will override Transport's PingPongConfig to enable setting ping/pong interval
// for individual client.
PingPongConfig *PingPongConfig
}
// ConnectingHandler called when new client authenticates on server.
type ConnectingHandler func(context.Context, ConnectEvent) (ConnectReply, error)
// ConnectHandler called when client connected to server and ready to communicate.
type ConnectHandler func(*Client)
// RefreshEvent contains fields related to refresh event.
type RefreshEvent struct {
// ClientSideRefresh is true for refresh initiated by client-side refresh workflow.
ClientSideRefresh bool
// Token will only be set in case of using client-side refresh mechanism.
Token string
}
// RefreshReply contains fields determining the reaction on refresh event.
type RefreshReply struct {
// Expired tells Centrifuge that connection expired. In this case connection will be
// closed with DisconnectExpired.
Expired bool
// ExpireAt defines time in future when connection should expire,
// zero value means no expiration.
ExpireAt int64
// Info allows modifying connection information,
// zero value means no modification of current connection Info.
Info []byte
}
// RefreshCallback should be called as soon as handler decides what to do
// with connection refresh event.
type RefreshCallback func(RefreshReply, error)
// RefreshHandler called when it's time to validate client connection and
// update its expiration time if it's still actual.
//
// Centrifuge library supports two ways of refreshing connection: client-side
// and server-side.
//
// The default mechanism is server-side, this means that as soon refresh handler
// set and connection expiration time happens (by timer) – refresh handler will
// be called.
//
// If ClientSideRefresh in ConnectReply inside ConnectingHandler set to true then
// library uses client-side refresh mechanism. In this case library relies on
// Refresh commands sent from client periodically to refresh connection. Refresh
// command contains updated connection token.
type RefreshHandler func(RefreshEvent, RefreshCallback)
// AliveHandler called periodically while connection alive. This is a helper
// to do periodic things which can tolerate some approximation in time. This
// callback will run every ClientPresenceUpdateInterval and can save you a timer.
type AliveHandler func()
// UnsubscribeEvent contains fields related to unsubscribe event.
type UnsubscribeEvent struct {
// Channel client unsubscribed from.
Channel string
// ServerSide set to true for server-side subscription unsubscribe events.
ServerSide bool
// Unsubscribe identifies the source of unsubscribe (i.e. why unsubscribed event happened).
Unsubscribe
// Disconnect can be additionally set to identify the reason of disconnect when Unsubscribe.Code
// is UnsubscribeCodeDisconnect - i.e. when unsubscribe caused by a client disconnection process.
// Otherwise, it's nil.
Disconnect *Disconnect
}
// UnsubscribeHandler called when client unsubscribed from channel.
type UnsubscribeHandler func(UnsubscribeEvent)
// DisconnectEvent contains fields related to disconnect event.
type DisconnectEvent struct {
// Disconnect contains a Disconnect object which identifies the code and reason
// of disconnect process. When disconnect was not initiated by a server this
// is always DisconnectConnectionClosed.
Disconnect
}
// DisconnectHandler called when client disconnects from server. The important
// thing to remember is that you should not rely entirely on this handler to
// clean up non-expiring resources (in your database for example). Why? Because
// in case of any non-graceful node shutdown (kill -9, process crash, machine lost)
// disconnect handler will never be called (obviously) so you can have stale data.
type DisconnectHandler func(DisconnectEvent)
// SubscribeEvent contains fields related to subscribe event.
type SubscribeEvent struct {
// Channel client wants to subscribe to.
Channel string
// Token will only be set for token channels. This is a task of application
// to check that subscription to a channel has valid token.
Token string
// Data received from client as part of Subscribe Command.
Data []byte
// Positioned is true when Client wants to create subscription with positioned property.
Positioned bool
// Recoverable is true when Client wants to create subscription with recoverable property.
Recoverable bool
// JoinLeave is true when Client wants to receive join/leave messages.
JoinLeave bool
}
// SubscribeCallback should be called as soon as handler decides what to do
// with connection subscribe event.
type SubscribeCallback func(SubscribeReply, error)
// SubscribeReply contains fields determining the reaction on subscribe event.
type SubscribeReply struct {
// Options to control subscription.
Options SubscribeOptions
// ClientSideRefresh tells library to use client-side refresh logic: i.e. send
// SubRefresh commands with new Subscription Token. If not set then server-side
// SubRefresh handler will be used.
ClientSideRefresh bool
// SubscriptionReady channel if provided will be closed as soon as Centrifuge
// written subscribe reply to the connection, so it's possible to start writing
// publications into a channel using experimental Client.WritePublication method.
// In usual flow you don't need to provide this channel at all.
// This is EXPERIMENTAL and may be removed in the future.
SubscriptionReady chan struct{}
}
// SubscribeHandler called when client wants to subscribe on channel.
type SubscribeHandler func(SubscribeEvent, SubscribeCallback)
// PublishEvent contains fields related to publish event. Note that this event
// called before actual publish to Broker so handler has an option to reject this
// publication returning an error.
type PublishEvent struct {
// Channel client wants to publish data to.
Channel string
// Data client wants to publish.
Data []byte
// ClientInfo about client connection.
ClientInfo *ClientInfo
}
// PublishReply contains fields determining the result on publish.
type PublishReply struct {
// Options to control publication.
Options PublishOptions
// Result if set will tell Centrifuge that message already published to
// channel by handler code. In this case Centrifuge won't try to publish
// into channel again after handler returned PublishReply. This can be
// useful if you need to know new Publication offset in your code, or you
// want to make sure message successfully published to Broker on server
// side (otherwise only client will get an error).
Result *PublishResult
}
// PublishCallback should be called with PublishReply or error.
type PublishCallback func(PublishReply, error)
// PublishHandler called when client publishes into channel.
type PublishHandler func(PublishEvent, PublishCallback)
// SubRefreshEvent contains fields related to subscription refresh event.
type SubRefreshEvent struct {
// ClientSideRefresh is true for refresh initiated by client-side subscription
// refresh workflow.
ClientSideRefresh bool
// Channel to which SubRefreshEvent belongs to.
Channel string
// Token will only be set in case of using client-side subscription refresh mechanism.
Token string
}
// SubRefreshReply contains fields determining the reaction on
// subscription refresh event.
type SubRefreshReply struct {
// Expired tells Centrifuge that subscription expired. In this case connection will be
// closed with DisconnectExpired.
Expired bool
// ExpireAt is a new Unix time of expiration. Zero value means no expiration.
ExpireAt int64
// Info is a new channel-scope info. Zero value means do not change previous one.
Info []byte
}
// SubRefreshCallback should be called as soon as handler decides what to do
// with connection SubRefreshEvent.
type SubRefreshCallback func(SubRefreshReply, error)
// SubRefreshHandler called when it's time to validate client subscription to channel and
// update it's state if needed.
//
// If ClientSideRefresh in SubscribeReply inside SubscribeHandler set to true then
// library uses client-side subscription refresh mechanism. In this case library relies on
// SubRefresh commands sent from client periodically to refresh subscription. SubRefresh
// command contains updated subscription token.
type SubRefreshHandler func(SubRefreshEvent, SubRefreshCallback)
// RPCEvent contains fields related to rpc request.
type RPCEvent struct {
// Method is an optional string that contains RPC method name client wants to call.
// This is an optional field, by default clients send RPC without any method set.
Method string
// Data contains RPC untouched payload.
Data []byte
}
// RPCReply contains fields determining the reaction on rpc request.
type RPCReply struct {
// Data to return in RPC reply to client.
Data []byte
}
// RPCCallback should be called as soon as handler decides what to do
// with connection RPCEvent.
type RPCCallback func(RPCReply, error)
// RPCHandler must handle incoming command from client.
type RPCHandler func(RPCEvent, RPCCallback)
// MessageEvent contains fields related to message request.
type MessageEvent struct {
// Data contains message untouched payload.
Data []byte
}
// MessageHandler must handle incoming async message from client. So Centrifuge
// feels similar to pure WebSocket API. Though in general, we recommend using RPC
// where possible to send data to a server from a client as it provides a better
// flow control.
type MessageHandler func(MessageEvent)
// PresenceEvent has channel operation called for.
type PresenceEvent struct {
Channel string
}
// PresenceReply contains fields determining the reaction on presence request.
type PresenceReply struct {
Result *PresenceResult
}
// PresenceCallback should be called with PresenceReply or error.
type PresenceCallback func(PresenceReply, error)
// PresenceHandler called when presence request received from client.
type PresenceHandler func(PresenceEvent, PresenceCallback)
// PresenceStatsEvent has channel operation called for.
type PresenceStatsEvent struct {
Channel string
}
// PresenceStatsReply contains fields determining the reaction on presence request.
type PresenceStatsReply struct {
Result *PresenceStatsResult
}
// PresenceStatsCallback should be called with PresenceStatsReply or error.
type PresenceStatsCallback func(PresenceStatsReply, error)
// PresenceStatsHandler must handle incoming command from client.
type PresenceStatsHandler func(PresenceStatsEvent, PresenceStatsCallback)
// HistoryEvent has channel operation called for.
type HistoryEvent struct {
Channel string
Filter HistoryFilter
}
// HistoryReply contains fields determining the reaction on history request.
type HistoryReply struct {
Result *HistoryResult
}
// HistoryCallback should be called with HistoryReply or error.
type HistoryCallback func(HistoryReply, error)
// HistoryHandler must handle incoming command from client.
type HistoryHandler func(HistoryEvent, HistoryCallback)
// StateSnapshotHandler must return a copy of current client's
// internal state. Returning a copy is important to avoid data races.
type StateSnapshotHandler func() (any, error)
// CacheEmptyEvent is issued when recovery mode is used but Centrifuge can't
// find Publication in history to recover from. This event allows application
// to decide what to do in this case – it's possible to populate the cache by
// sending actual data to a channel.
type CacheEmptyEvent struct {
Channel string
}
// CacheEmptyReply contains fields determining the reaction on cache empty event.
type CacheEmptyReply struct {
// Populated when set to true tells Centrifuge that cache was populated and
// in that case Centrifuge will try to recover missed Publication from history
// one more time.
Populated bool
}
// CacheEmptyHandler allows setting cache empty handler function.
type CacheEmptyHandler func(CacheEmptyEvent) (CacheEmptyReply, error)
// SurveyEvent with Op and Data of survey.
type SurveyEvent struct {
Op string
Data []byte
}
// SurveyReply contains survey reply fields.
type SurveyReply struct {
Code uint32
Data []byte
}
// SurveyCallback should be called with SurveyReply as soon as survey completed.
type SurveyCallback func(SurveyReply)
// SurveyHandler allows setting survey handler function.
type SurveyHandler func(SurveyEvent, SurveyCallback)
// NotificationEvent with Op and Data.
type NotificationEvent struct {
FromNodeID string
Op string
Data []byte
}
// NotificationHandler allows handling notifications.
type NotificationHandler func(NotificationEvent)
// NodeInfoSendReply can modify sending Node control frame in some ways.
type NodeInfoSendReply struct {
// Data allows setting an arbitrary data to the control node frame which is
// published by each Node periodically, so it will be available in the
// result of Node.Info call for the current Node description. Keep this
// data reasonably small.
Data []byte
}
// NodeInfoSendHandler called every time the control node frame is published
// and allows modifying Node control frame sending. Currently, attaching an
// arbitrary data to it. See NodeInfoSendReply.
type NodeInfoSendHandler func() NodeInfoSendReply
// TransportWriteEvent called just before sending data into the client connection. The
// event is triggered from inside each client's message queue consumer – so it should
// not directly affect Hub broadcast latencies.
type TransportWriteEvent struct {
// Data represents single Centrifuge protocol message which is going to be sent
// into the connection. For unidirectional transports this is an encoded protocol.Push
// type, for bidirectional transports this is an encoded protocol.Reply type.
Data []byte
// Channel will be set if TransportWriteEvent relates to some channel.
Channel string
// FrameType tells what is being sent inside Data.
FrameType protocol.FrameType
}
// TransportWriteHandler called just before writing data to the Transport.
// At this moment application can skip sending data to a client returning
// false from a handler. The main purpose of this handler is not a message
// filtering based on data content but rather tracing stuff.
type TransportWriteHandler func(*Client, TransportWriteEvent) bool
// CommandReadEvent contains protocol.Command processed by Client. Command
// type and its fields in the event MAY BE POOLED by Centrifuge, so code
// which wants to use Command AFTER CommandReadHandler handler returns MUST
// MAKE A COPY.
type CommandReadEvent struct {
// Command which was read from the connection. May be pooled - see comment of CommandReadEvent.
Command *protocol.Command
// CommandSize is a size of command in bytes in its protocol representation.
CommandSize int
}
// CommandReadHandler allows setting a callback which will be called before
// Client processed a protocol.Command read from the connection. Return an error
// if you want to prevent command execution.
// Also, carefully read docs for CommandReadEvent to avoid possible bugs.
type CommandReadHandler func(*Client, CommandReadEvent) error
// CommandProcessedEvent contains protocol.Command processed by Client. Command and
// Reply types and their fields in the event MAY BE POOLED by Centrifuge, so code
// which wants to use them AFTER CommandProcessedHandler handler returns MUST MAKE A
// COPY.
type CommandProcessedEvent struct {
// Command which was processed. May be pooled - see comment of CommandProcessedEvent.
Command *protocol.Command
// Error may be set to non-nil if Command processing resulted into error.
Error error
// Reply to the command. Reply may be pooled - see comment of CommandProcessedEvent.
// This Reply may be nil, for example in the following cases:
// 1. For Send command since send commands do not have replies
// 2. When command processing resulted into disconnection of the client without sending a reply.
// 3. When unidirectional transport connects (Centrifuge creates Connect Command artificially
// with id: 1 and never sends replies to the unidirectional transport, only pushes).
Reply *protocol.Reply
// Started is a time command was passed to Client for processing.
Started time.Time
}
// newCommandProcessedEvent is a helper to create CommandProcessedEvent.
func newCommandProcessedEvent(command *protocol.Command, err error, reply *protocol.Reply, started time.Time) CommandProcessedEvent {
return CommandProcessedEvent{Command: command, Error: err, Reply: reply, Started: started}
}
// CommandProcessedHandler allows setting a callback which will be called after
// Client processed a protocol.Command. This exists mostly for real-time connection
// tracing purposes. CommandProcessedHandler may be called after the corresponding
// Reply written to the connection and TransportWriteHandler called. But for tracing
// purposes this seems tolerable as commands and replies may be matched by id.
// Also, carefully read docs for CommandProcessedEvent to avoid possible bugs.
type CommandProcessedHandler func(*Client, CommandProcessedEvent)