-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathindex.js
88 lines (74 loc) · 2.28 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
var io = require('socket.io');
var _ = require('underscore');
var Rx = require('rx');
// Pass in listento which will be an instance of httpserver or a port number.
// Same as expected socket.io listen arguments.
var Reiki = function(listenTo) {
this.subjects = {};
this.socketsById = {};
this.io = io.listen(listenTo);
this._init(this.io);
};
Reiki.prototype = Object.create({});
Reiki.prototype._init = function(io) {
var that = this;
this.connectionStream = Rx.Observable.fromEvent(io, 'connection');
this.connectionStream.subscribe(function(socket) {
that._addConnectionById(socket);
that.disconnectionStream = Rx.Observable.fromEvent(socket, 'disconnect');
socket.on('disconnect', function() {
that._removeConnectionById(socket.id);
});
_.each(that.subjects, function(subject, eventType) {
that._addToEventStream(socket, eventType);
});
});
};
Reiki.prototype._addConnectionById = function(socket) {
this.socketsById[socket.id] = socket;
};
Reiki.prototype._getConnectionById = function(id) {
return this.socketsById[id] || undefined;
};
Reiki.prototype._removeConnectionById = function(id) {
this.socketsById[id] = null;
delete this.socketsById[id];
};
// Creates a new Subject instance for each event type.
// The subject instance subscribes to each individual socket connections event stream
// and can be itself subscribed to as an observable stream by the end user.
Reiki.prototype._ensureEventStream = function(ev) {
if (!this.subjects[ev]) {
this.subjects[ev] = new Rx.Subject();
}
return this.subjects[ev];
};
Reiki.prototype.createEventStream = function(ev) {
return this._ensureEventStream(ev);
};
// Subscribes appropriate subject newStream to individual sockets event stream.
Reiki.prototype._addToEventStream = function(socket, ev) {
var newStream = new Rx.Subject();
socket.on(ev, function(data) {
newStream.onNext({
socket: socket,
message: data
});
});
newStream.subscribe(this._ensureEventStream(ev));
return newStream;
};
Reiki.prototype.stop = function(callback) {
try {
_(this.socketsById).each(function(socket) {
if (!socket.disconnected) {
socket.disconnect();
}
});
this.io.server.close();
}
catch (e) {
console.log(e);
}
};
module.exports = Reiki;