-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
262 lines (229 loc) · 7.66 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
'use strict';
var debug = require('diagnostics')('primus:mirage')
, crypto = require('crypto')
, one = require('one-time')
, mirage = module.exports;
/**
* The client interface of the mirage client.
*
* @param {Primus} primus The Primus connection.
* @param {Object} options The supplied options from the new Primus constructor
* @api public
*/
mirage.client = function client(primus, options) {
primus.mirage = primus.mirage || options.mirage || '';
primus.on('mirage', function mirage(id) {
primus.mirage = id;
if (primus.buffer.length) {
var data = primus.buffer.slice()
, length = data.length
, i = 0;
primus.buffer.length = 0;
for (; i < length; i++) {
primus._write(data[i]);
}
}
});
/**
* Add an extra _mirage key to the URL so we can figure out we have
* a persistent session id or not.
*
* @param {Object} options The request options.
* @api private
*/
primus.on('outgoing::url', function url(options) {
if (!primus.mirage) return;
var querystring = primus.querystring(options.query || '');
querystring._mirage = primus.mirage;
options.query = primus.querystringify(querystring);
});
/**
* The actual message writer.
*
* @NOTE: This function is an identical copy and paste from Primus's ._write
* method. The only exception is that we added a check for `primus.mirage` to
* determine if we are ready to write data to the server.
*
* @param {Mixed} data The message that needs to be written.
* @returns {Boolean} Successful write to the underlaying transport.
* @api private
*/
primus._write = function write(data) {
//
// The connection is closed, normally this would already be done in the
// `spark.write` method, but as `_write` is used internally, we should also
// add the same check here to prevent potential crashes by writing to a dead
// socket.
//
if (Primus.OPEN !== primus.readyState || !primus.mirage) {
//
// If the buffer is at capacity, remove the first item.
//
if (primus.buffer.length === primus.options.queueSize) {
primus.buffer.splice(0, 1);
}
primus.buffer.push(data);
return false;
}
primus.encoder(data, function encoded(err, packet) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return primus.listeners('error').length && primus.emit('error', err);
primus.emit('outgoing::data', packet);
});
return true;
};
};
/**
* Server logic for generating session id's which the client is forced to use or
* identify him self with.
*
* @param {Primus} primus Server side Primus instance.
* @param {Object} options The supplied options to the Primus constructor.
* @api private
*/
mirage.server = function server(primus, options) {
/**
* Generator of session ids. It should call the callback with a string that
* should be used as session id. If a generation failed you should set an
* error as first argument in the callback.
*
* This function will only be called if there is no id sent with the request.
*
* @param {Spark} spark The incoming connection.
* @param {Function} fn Completion callback.
* @api public
*/
var gen = function gen(spark, fn) {
crypto.randomBytes(8, function generated(err, buff) {
if (err) return fn(err);
fn(undefined, buff.toString('hex'));
});
};
/**
* Simple validator function for when a user connects with an existing id.
*
* @param {Spark} spark Reference to the spark instance that is connecting.
* @param {Function} fn Completion callback.
* @api public
*/
var valid = function valid(spark, fn) {
return fn();
};
primus.id = {
/**
* Add a custom session id generator.
*
* @param {Function} fn Error first completion callback.
* @api public
*/
generator: function generator(fn) {
if ('function' === typeof fn) gen = fn;
return server;
},
/**
* The maximum time it should take to validate or generate a session id. If
* a timeout occurs all messages will be flushed and the callback with be
* called with an error.
*
* @type {Number}
* @api public
*/
timeout: options['mirage timeout'] || 5000,
/**
* Add a custom session id validator.
*
* @param {Function} fn Error first completion callback.
* @api public
*/
validator: function validator(fn) {
if ('function' === typeof fn) valid = fn;
return server;
}
};
/**
* Intercept incoming connections and block the connection event until we've
* gotten a valid session id.
*
* @param {Spark} spark Incoming connection.
* @param {Function} fn Completion callback.
* @api private
*/
primus.on('connection', function connection(spark, fn) {
debug('validating new incoming connection');
spark.mirage = spark.query._mirage;
spark.buffer = [];
/**
* A simple callback wrapping that ensures that we flush the messages that
* we've buffered while we were generating or validating and id. If we
* receive an error, we just ignore all received messages and assume they
* are evil.
*
* @param {Error} err An error that we've received.
* @api public
*/
var generateorvalidate = one(function processed(err, id) {
clearTimeout(timeout);
var buffer = spark.buffer;
spark.buffer = null;
//
// The id can be generated by the generator function but also by the
// validator in order to update the user with a new session id instead of
// bluntly disconnecting.
//
if (id) {
if (spark.send) spark.send('mirage', id);
else spark.emit('mirage', id);
spark.mirage = id;
}
//
// Error is an indication of a failed generation of id or a failed
// validation of an id. In both cases we should pass it to callback and
// abort the connection as we cannot determine if this user is allowed.
//
if (err) {
debug('failed to process request due to %s', err.message);
return fn(err);
} else fn();
//
// We need to send the buffer after we've called the `fn` callback so we
// no longer block the `connection` event. After this we've given the user
// time enough to assign a `data` listener to their `spark` instance and
// we can safely re-transform the data.
//
debug('writing %d queued messages to spark', buffer.length);
buffer.forEach(function each(packet) {
spark.transforms(primus, spark, 'incoming', packet.data, packet.raw);
});
});
//
// Prevent the validation or generation from taking to much time. Add
// a timeout.
//
var timeout = setTimeout(function timeout() {
generateorvalidate(new Error('Failed to '+ (spark.mirage ? 'validate' : 'generate') +' id in a timely manner'));
}, primus.id.timeout);
if (spark.mirage) {
debug('found existing mirage id (%s) in query, validating', spark.mirage);
return valid.call(primus, spark, generateorvalidate);
}
debug('generating new id as none was supplied');
gen.call(primus, spark, generateorvalidate);
});
/**
* Add a incoming message transformer so we can buffer messages that arrive
* while we are generating or validating an id.
*
* @param {Object} packet The incoming data message.
* @returns {Boolean|Undefined}
* @api public
*/
primus.transform('incoming', function incoming(packet) {
if (this.buffer) {
this.buffer.push(packet);
return false;
}
});
};