-
-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathmultiplex_server.js
78 lines (70 loc) · 2.29 KB
/
multiplex_server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
var events = require('events');
var stream = require('stream');
exports.MultiplexServer = MultiplexServer = function(service) {
var that = this;
this.registered_channels = {};
this.service = service;
this.service.on('connection', function(conn) {
var channels = {};
conn.on('data', function(message) {
var t = message.split(',');
var type = t.shift(), topic = t.shift(), payload = t.join();
if (!(topic in that.registered_channels)) {
return;
}
if (topic in channels) {
var sub = channels[topic];
switch(type) {
case 'uns':
delete channels[topic];
sub.emit('close');
break;
case 'msg':
sub.emit('data', payload);
break;
}
} else {
switch(type) {
case 'sub':
var sub = channels[topic] = new Channel(conn, topic,
channels);
that.registered_channels[topic].emit('connection', sub)
break;
}
}
});
conn.on('close', function() {
for (topic in channels) {
channels[topic].emit('close');
}
channels = {};
});
});
};
MultiplexServer.prototype.registerChannel = function(name) {
return this.registered_channels[escape(name)] = new events.EventEmitter();
};
var Channel = function(conn, topic, channels) {
this.conn = conn;
this.topic = topic;
this.channels = channels;
stream.Stream.call(this);
};
Channel.prototype = new stream.Stream();
Channel.prototype.write = function(data) {
this.conn.write('msg,' + this.topic + ',' + data);
};
Channel.prototype.end = function(data) {
var that = this;
if (data) this.write(data);
if (this.topic in this.channels) {
this.conn.write('uns,' + this.topic);
delete this.channels[this.topic];
process.nextTick(function(){that.emit('close');});
}
};
Channel.prototype.destroy = Channel.prototype.destroySoon =
function() {
this.removeAllListeners();
this.end();
};