forked from primus/primus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark.js
545 lines (470 loc) · 16.3 KB
/
spark.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
'use strict';
var ParserError = require('./errors').ParserError
, log = require('diagnostics')('primus:spark')
, parse = require('querystring').parse
, forwarded = require('forwarded-for')
, Ultron = require('ultron')
, fuse = require('fusing')
, yeast = require('yeast')
, u2028 = /\u2028/g
, u2029 = /\u2029/g;
/**
* The Spark is an indefinable, indescribable energy or soul of a transformer
* which can be used to create new transformers. In our case, it's a simple
* wrapping interface.
*
* @constructor
* @param {Primus} primus Reference to the Primus server. (Set using .bind)
* @param {Object} headers The request headers for this connection.
* @param {Object} address The object that holds the remoteAddress and port.
* @param {Object} query The query string of request.
* @param {String} id An optional id of the socket, or we will generate one.
* @param {Request} request The HTTP Request instance that initialised the spark.
* @api public
*/
function Spark(primus, headers, address, query, id, request) {
this.fuse();
var writable = this.writable
, spark = this;
query = query || {};
id = id || yeast();
headers = headers || {};
address = address || {};
request = request || headers['primus::req::backup'];
writable('id', id); // Unique id for socket.
writable('primus', primus); // References to Primus.
writable('remote', address); // The remote address location.
writable('headers', headers); // The request headers.
writable('request', request); // Reference to an HTTP request.
writable('writable', true); // Silly stream compatibility.
writable('readable', true); // Silly stream compatibility.
writable('queue', []); // Data queue for data events.
writable('query', query); // The query string.
writable('timeout', null); // Heartbeat timeout.
writable('ultron', new Ultron(this)); // Our event listening cleanup.
//
// Parse our query string.
//
if ('string' === typeof this.query) {
this.query = parse(this.query);
}
this.heartbeat().__initialise.forEach(function execute(initialise) {
initialise.call(spark);
});
}
fuse(Spark, require('stream'), { merge: false, mixin: false });
//
// Internal readyState's to prevent writes against close sockets.
//
Spark.OPENING = 1; // Only here for primus.js readyState number compatibility.
Spark.CLOSED = 2; // The connection is closed.
Spark.OPEN = 3; // The connection is open.
//
// Make sure that we emit `readyState` change events when a new readyState is
// checked. This way plugins can correctly act according to this.
//
Spark.readable('readyState', {
get: function get() {
return this.__readyState;
},
set: function set(readyState) {
if (this.__readyState === readyState) return readyState;
this.__readyState = readyState;
this.emit('readyStateChange');
return readyState;
}
}, true);
Spark.writable('__readyState', Spark.OPEN);
//
// Lazy parse interface for IP address information. As nobody is always
// interested in this, we're going to defer parsing until it's actually needed.
//
Spark.get('address', function address() {
return this.request.forwarded || forwarded(this.remote, this.headers, this.primus.whitelist);
});
/**
* Set a timer to forcibly disconnect the spark if no data is received from the
* client within the given timeout.
*
* @api private
*/
Spark.readable('heartbeat', function heartbeat() {
var spark = this;
clearTimeout(spark.timeout);
if (!spark.primus.timeout) return spark;
log('setting new heartbeat timeout for %s', spark.id);
this.timeout = setTimeout(function timeout() {
//
// Set reconnect to true so we're not sending a `primus::server::close`
// packet.
//
spark.end(undefined, { reconnect: true });
}, spark.primus.timeout);
// Emit an event so the application can know the timer has been reset.
spark.emit('heartbeat');
return this;
});
/**
* Checks if the given event is an emitted event by Primus.
*
* @param {String} evt The event name.
* @returns {Boolean}
* @api public
*/
Spark.readable('reserved', function reserved(evt) {
return (/^(incoming|outgoing)::/).test(evt)
|| evt in reserved.events;
});
/**
* The actual events that are used by the Spark.
*
* @type {Object}
* @api public
*/
Spark.prototype.reserved.events = {
readyStateChange: 1,
heartbeat: 1,
error: 1,
data: 1,
end: 1
};
/**
* Allows for adding initialise listeners without people overriding our default
* initializer. If they are feeling adventures and really want want to hack it
* up, they can remove it from the __initialise array.
*
* @returns {Function} The last added initialise hook.
* @api public
*/
Spark.readable('initialise', {
get: function get() {
return this.__initialise[this.__initialise.length - 1];
},
set: function set(initialise) {
if ('function' === typeof initialise) this.__initialise.push(initialise);
}
}, true);
/**
* Attach hooks and automatically announce a new connection.
*
* @type {Array}
* @api private
*/
Spark.readable('__initialise', [function initialise() {
var primus = this.primus
, ultron = this.ultron
, spark = this;
//
// Prevent double initialization of the spark. If we already have an
// `incoming::data` handler we assume that all other cases are handled as well.
//
if (this.listeners('incoming::data').length) {
return log('already has incoming::data listeners, bailing out');
}
//
// We've received new data from our client, decode and emit it.
//
ultron.on('incoming::data', function message(raw) {
//
// New data has arrived so we're certain that the connection is still alive,
// so it's save to restart the heartbeat sequence.
//
spark.heartbeat();
primus.decoder.call(spark, raw, function decoding(err, data) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) {
log('failed to decode the incoming data for %s', spark.id);
return new ParserError('Failed to decode incoming data: '+ err.message, spark, err);
}
//
// Handle "primus::" prefixed protocol messages.
//
if (spark.protocol(data)) return;
spark.transforms(primus, spark, 'incoming', data, raw);
});
});
//
// We've received a ping event. This is fired upon receipt of a WebSocket
// Ping frame or a `pimus::ping::<timestamp>` message. In the former case
// the listener is called without arguments and we should only reset the
// heartbeat.
//
ultron.on('incoming::ping', function ping(time) {
if (time === undefined) return spark.heartbeat();
spark.emit('outgoing::pong', time);
spark._write('primus::pong::'+ time);
});
//
// The client has disconnected.
//
ultron.on('incoming::end', function disconnect() {
//
// The socket is closed, sending data over it will throw an error.
//
log('transformer closed connection for %s', spark.id);
spark.end(undefined, { reconnect: true });
});
ultron.on('incoming::error', function error(err) {
//
// Ensure that the error we emit is always an Error instance. There are
// transformers that used to emit only strings. A string is not an Error.
//
if ('string' === typeof err) {
err = new Error(err);
}
if (spark.listeners('error').length) spark.emit('error', err);
spark.primus.emit('log', 'error', err);
log('transformer received error `%s` for %s', err.message, spark.id);
spark.end();
});
//
// End is triggered by both incoming and outgoing events.
//
ultron.on('end', function end() {
clearTimeout(spark.timeout);
primus.emit('disconnection', spark);
//
// We are most likely the first `end` event in the EventEmitter stack which
// will make our callback the first to be execute. If we instantly delete
// properties it will cause that our users can't access them anymore in
// their `end` listener. So if they need to un-register something based on
// the spark.id, that would be impossible. Therefor we delay our deletion
// with a non scientific amount of milliseconds to give people some time to
// use these references for the last time.
//
setTimeout(function timeout() {
log('releasing references from our spark object for %s', spark.id);
//
// Release references.
// @TODO also remove the references that we're set by users.
//
[
'id', 'primus', 'remote', 'headers', 'request', 'query'
].forEach(function each(key) {
delete spark[key];
});
}, 10);
});
//
// Announce a new connection. This allows the transformers to change or listen
// to events before we announce it.
//
process.nextTick(function tick() {
primus.asyncemit('connection', spark, function damn(err) {
if (!err) {
if (spark.queue) spark.queue.forEach(function each(packet) {
spark.emit('data', packet.data, packet.raw);
});
spark.queue = null;
return;
}
spark.emit('incoming::error', err);
});
});
}]);
/**
* Execute the set of message transformers from Primus on the incoming or
* outgoing message.
* This function and it's content should be in sync with Primus#transforms in
* primus.js.
*
* @param {Primus} primus Reference to the Primus instance with message transformers.
* @param {Spark|Primus} connection Connection that receives or sends data.
* @param {String} type The type of message, 'incoming' or 'outgoing'.
* @param {Mixed} data The data to send or that has been received.
* @param {String} raw The raw encoded data.
* @returns {Spark}
* @api public
*/
Spark.readable('transforms', function transforms(primus, connection, type, data, raw) {
var packet = { data: data, raw: raw }
, fns = primus.transformers[type];
//
// Iterate in series over the message transformers so we can allow optional
// asynchronous execution of message transformers which could for example
// retrieve additional data from the server, do extra decoding or even
// message validation.
//
(function transform(index, done) {
var transformer = fns[index++];
if (!transformer) return done();
if (1 === transformer.length) {
if (false === transformer.call(connection, packet)) {
//
// When false is returned by an incoming transformer it means that's
// being handled by the transformer and we should not emit the `data`
// event.
//
return;
}
return transform(index, done);
}
transformer.call(connection, packet, function finished(err, arg) {
if (err) return connection.emit('error', err);
if (false === arg) return;
transform(index, done);
});
}(0, function done() {
//
// We always emit 2 arguments for the data event, the first argument is the
// parsed data and the second argument is the raw string that we received.
// This allows you, for example, to do some validation on the parsed data
// and then save the raw string in your database without the stringify
// overhead.
//
if ('incoming' === type) {
//
// This is pretty bad edge case, it's possible that the async version of
// the `connection` event listener takes so long that we cannot assign
// `data` handlers and we are already receiving data as the connection is
// already established. In this edge case we need to queue the data and
// pass it to the data event once we're listening.
//
if (connection.queue) return connection.queue.push(packet);
return connection.emit('data', packet.data, packet.raw);
}
connection._write(packet.data);
}));
return this;
});
/**
* Really dead simple protocol parser. We simply assume that every message that
* is prefixed with `primus::` could be used as some sort of protocol definition
* for Primus.
*
* @param {String} msg The data.
* @returns {Boolean} Is a protocol message.
* @api private
*/
Spark.readable('protocol', function protocol(msg) {
if (
'string' !== typeof msg
|| msg.indexOf('primus::') !== 0
) return false;
var last = msg.indexOf(':', 8)
, value = msg.slice(last + 2);
switch (msg.slice(8, last)) {
case 'ping':
this.emit('incoming::ping', +value);
break;
case 'id':
this._write('primus::id::'+ this.id);
break;
//
// Unknown protocol, somebody is probably sending `primus::` prefixed
// messages.
//
default:
log('message `%s` was prefixed with primus:: but not supported', msg);
return false;
}
log('processed a primus protocol message `%s`', msg);
return true;
});
/**
* Send a new message to a given spark.
*
* @param {Mixed} data The data that needs to be written.
* @returns {Boolean} Always returns true.
* @api public
*/
Spark.readable('write', function write(data) {
var primus = this.primus;
//
// The connection is closed, return false.
//
if (Spark.CLOSED === this.readyState) {
log('attempted to write but readyState was already set to CLOSED for %s', this.id);
return false;
}
this.transforms(primus, this, 'outgoing', data);
return true;
});
/**
* The actual message writer.
*
* @param {Mixed} data The message that needs to be written.
* @returns {Boolean}
* @api private
*/
Spark.readable('_write', function _write(data) {
var primus = this.primus
, spark = this;
//
// The connection is closed, normally this would already be done in the
// `spark.write` method, but as `_write` is used internally, we should also
// add the same check here to prevent potential crashes by writing to a dead
// socket.
//
if (Spark.CLOSED === spark.readyState) {
log('attempted to _write but readyState was already set to CLOSED for %s', spark.id);
return false;
}
primus.encoder.call(spark, data, function encoded(err, packet) {
//
// Do a "safe" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return new ParserError('Failed to encode outgoing data: '+ err.message, spark, err);
if (!packet) return log('nothing to write, bailing out for %s', spark.id);
//
// Hack 1: \u2028 and \u2029 are allowed inside a JSON string, but JavaScript
// defines them as newline separators. Unescaped control characters are not
// allowed inside JSON strings, so this causes an error at parse time. We
// work around this issue by escaping these characters. This can cause
// errors with JSONP requests or if the string is just evaluated.
//
if ('string' === typeof packet) {
if (~packet.indexOf('\u2028')) packet = packet.replace(u2028, '\\u2028');
if (~packet.indexOf('\u2029')) packet = packet.replace(u2029, '\\u2029');
}
spark.emit('outgoing::data', packet);
});
return true;
});
/**
* End the connection.
*
* Options:
* - reconnect (boolean) Trigger client-side reconnect.
*
* @param {Mixed} data Optional closing data.
* @param {Object} options End instructions.
* @api public
*/
Spark.readable('end', function end(data, options) {
if (Spark.CLOSED === this.readyState) return this;
options = options || {};
if (data !== undefined) this.write(data);
//
// If we want to trigger a reconnect do not send
// `primus::server::close`, otherwise bypass the .write method
// as this message should not be transformed.
//
if (!options.reconnect) this._write('primus::server::close');
//
// This seems redundant but there are cases where the above writes
// can trigger another `end` call. An example is with Engine.IO
// when calling `end` on the client and `end` on the spark right
// after. The `end` call on the spark comes before the `incoming::end`
// event and the result is an attempt of writing to a closed socket.
// When this happens Engine.IO closes the connection and without
// this check the following instructions could be executed twice.
//
if (Spark.CLOSED === this.readyState) return this;
log('emitting final events for spark %s', this.id);
this.readyState = Spark.CLOSED;
this.emit('outgoing::end');
this.emit('end');
this.ultron.destroy();
delete this.ultron;
this.queue = null;
return this;
});
//
// Expose the module.
//
module.exports = Spark;