Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed store support. #46

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bin/peerjs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ var path = require('path')
allow_discovery: {
demand: false,
description: 'allow discovery of peers'
},
memcached_hosts: {
demand: false,
description: 'memcached hosts'
}
})
.boolean('allow_discovery')
Expand All @@ -71,6 +75,10 @@ if (opts.sslkey && opts.sslcert) {
opts.ssl['certificate'] = fs.readFileSync(path.resolve(opts.sslcert));
}

if (opts.memcached_hosts) {
opts.memcached_hosts = opts.memcached_hosts.split(',');
}

process.on('uncaughtException', function(e) {
console.error('Error: ' + e);
});
Expand Down
269 changes: 263 additions & 6 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ function PeerServer(options) {
concurrent_limit: 5000,
ssl: {},
path: '/',
allow_discovery: false
allow_discovery: false,
memcached_hosts: []
}, options);

util.debug = this._options.debug;
Expand Down Expand Up @@ -55,6 +56,19 @@ function PeerServer(options) {
// Mark concurrent users per ip
this._ips = {};

// Create memcached client.
if (this._options.memcached_hosts.length > 0) {
var Memcached = require('memcached');

if (!this._options.ip || !this._options.port) {
util.prettyError("ip and port must be specified when using memcached.");
return;
}

util.log('memcached_hosts: ', this._options.memcached_hosts);
this._memcached = new Memcached(this._options.memcached_hosts);
}

this._setCleanupIntervals();
}

Expand Down Expand Up @@ -85,6 +99,33 @@ PeerServer.prototype._initializeWSS = function() {
self._checkKey(key, ip, function(err) {
if (!err) {
if (!self._clients[key][id]) {
if (self._memcached) {
self._checkIdExists(key, id, function(state, err) {
if (err) {
socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: 'Server error' } }));
return;
}

switch (state) {
case 'not used':
self._clients[key][id] = { token: token, ip: ip };
self._ips[ip]++;
socket.send(JSON.stringify({ type: 'OPEN' }));
self._configureWS(socket, key, id, token);
break;

case 'used in other server':
socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: id + ' already exists.' } }));
break;

case 'used in this server':
self._configureWS(socket, key, id, token);
break;
}
});
return;
}

self._clients[key][id] = { token: token, ip: ip };
self._ips[ip]++;
socket.send(JSON.stringify({ type: 'OPEN' }));
Expand Down Expand Up @@ -118,6 +159,18 @@ PeerServer.prototype._configureWS = function(socket, key, id, token) {
return;
}

if (this._memcached) {
this._memcached.set(this._createKeyToStoreToMemcached(key, id), {
ip: this._options.ip,
port: this._options.port,
token: token
}, 600000, function(err) {
if (err) {
util.prettyError('Failed to set client info: ' + err);
}
});
}

this._processOutstanding(key, id);

// Cleanup after a socket closes.
Expand Down Expand Up @@ -195,8 +248,15 @@ PeerServer.prototype._initializeHTTP = function() {
// Retrieve guaranteed random ID.
this._app.get(this._options.path + ':key/id', function(req, res, next) {
res.contentType = 'text/html';
res.send(self._generateClientId(req.params.key));
return next();
self._generateClientId(req.params.key, function(id, err) {
if (err) {
util.prettyError('Failed to generate client ID: ' + err);
res.send(JSON.stringify({ type: 'HTTP-ERROR' }));
} else {
res.send(id);
}
return next();
});
});

// Server sets up HTTP streaming when you get post an ID.
Expand All @@ -209,6 +269,34 @@ PeerServer.prototype._initializeHTTP = function() {
if (!self._clients[key] || !self._clients[key][id]) {
self._checkKey(key, ip, function(err) {
if (!err && !self._clients[key][id]) {
if (self._memcached) {
self._checkIdExists(key, id, function(state, err) {
if (err) {
res.send(JSON.stringify({ type: 'HTTP-ERROR' }));
return next();
}

switch (state) {
case 'not used':
self._clients[key][id] = { token: token, ip: ip };
self._ips[ip]++;
self._startStreaming(res, key, id, token, true);
break;

case 'used in other server':
res.send(JSON.stringify({ type: 'HTTP-ERROR' }));
break;

case 'used in this server':
self._startStreaming(res, key, id, token);
break;
}

next();
});
return;
}

self._clients[key][id] = { token: token, ip: ip };
self._ips[ip]++;
self._startStreaming(res, key, id, token, true);
Expand Down Expand Up @@ -246,6 +334,35 @@ PeerServer.prototype._initializeHTTP = function() {

var client;
if (!self._clients[key] || !(client = self._clients[key][id])) {
if (self._memcached) {
self._memcached.get(self._createKeyToStoreToMemcached(key, id), function(err, data) {
if (err) {
util.prettyError('Failed to fetch client info: ' + err);
res.send(500);
return next();
}

if (!data) {
res.send(401);
return next();
}

if (req.params.token !== data.token) {
res.send(401);
} else {
self._handleTransmission(key, {
type: req.body.type,
src: id,
dst: req.body.dst,
payload: req.body.payload
});
res.send(200);
}
return next();
});
return;
}

if (req.params.retry) {
res.send(401);
return next();
Expand Down Expand Up @@ -367,6 +484,48 @@ PeerServer.prototype._setCleanupIntervals = function() {
setInterval(function() {
self._pruneOutstanding();
}, 5000);

if (this._memcached) {
setInterval(function() {
var keys = Object.keys(self._clients);
for (var i = 0, ii = keys.length; i < ii; i += 1) {
var dsts = Object.keys(self._clients[keys[i]]);
for (var j = 0, jj = dsts.length; j < jj; j += 1) {
var key = keys[i], dst = dsts[j];
// Check id is stored in memcached.
self._memcached.get(self._createKeyToStoreToMemcached(key, dst), function(err, data) {
if (err) {
util.prettyError('Failed to fetch client info: ' + err);
return;
}

if (!data || (data.ip === self._options.ip && data.port === self._options.port)) {
self._memcached.set(self._createKeyToStoreToMemcached(key, dst), {
ip: self._options.ip,
port: self._options.port,
token: self._clients[key][dst].token
}, 600000, function(err) {
if (err) {
util.prettyError('Failed to set client info: ' + err);
return;
}
});
return;
}

var client = self._clients[key][dst];
if (client.res) {
client.res.end();
}
if (client.socket) {
client.socket.close();
}
self._removePeer(key, dst);
});
}
}
}, 300000);
}
};

/** Process outstanding peer offers. */
Expand All @@ -383,6 +542,14 @@ PeerServer.prototype._processOutstanding = function(key, id) {

PeerServer.prototype._removePeer = function(key, id) {
if (this._clients[key] && this._clients[key][id]) {
if (this._memcached) {
this._memcached.del(this._createKeyToStoreToMemcached(key, id), function(err) {
if (err) {
util.prettyError('Failed to remove client info: ' + err);
}
});
}

this._ips[this._clients[key][id].ip]--;
delete this._clients[key][id];
this.emit('disconnect', id);
Expand Down Expand Up @@ -427,6 +594,11 @@ PeerServer.prototype._handleTransmission = function(key, message) {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== 'LEAVE' && type !== 'EXPIRE' && dst) {
if (this._memcached) {
this._transferMessage(key, message);
return;
}

var self = this;
if (!this._outstanding[key][dst]) {
this._outstanding[key][dst] = [];
Expand All @@ -441,15 +613,100 @@ PeerServer.prototype._handleTransmission = function(key, message) {
}
};

PeerServer.prototype._generateClientId = function(key) {
PeerServer.prototype._generateClientId = function(key, cb) {
var clientId = util.randomId();

if (!this._clients[key] || !this._clients[key][clientId]) {
if (this._memcached) {
var self = this;
this._checkIdExists(key, clientId, function(state, err) {
if (err) {
return cb(null, err);
}

if (state === 'not used') {
return cb(clientId);
}

self._generateClientId(key, cb);
});
return;
}
}

if (!this._clients[key]) {
return clientId;
return cb(clientId);
}
while (!!this._clients[key][clientId]) {
clientId = util.randomId();
}
return clientId;
return cb(clientId);
};

PeerServer.prototype._createKeyToStoreToMemcached = function(key, id) {
return id + '@' + key;
};

PeerServer.prototype._checkIdExists = function(key, id, cb) {
this._memcached.get(this._createKeyToStoreToMemcached(key, id), function(err, data) {
if (err) {
util.prettyError('Failed to fetch client info: ' + err);
return cb(null, err);
}

// id is not used.
if (!data) {
return cb('not used');
}

// id is already used in other server.
if (data.ip !== self._options.ip || data.port !== self._options.port) {
return cb('used in other server');
}

// id is used in this server.
cb('used in this server');
});
};

PeerServer.prototype._transferMessage = function(key, message) {
var self = this;
var type = message.type;
var src = message.src;
var dst = message.dst;

this._memcached.get(this._createKeyToStoreToMemcached(key, dst), function(err, data) {
if (err) {
util.prettyError('Failed to fetch client info: ' + err);
return;
}

if (!data) {
util.prettyError('Entry not found (key: ' + key + ', id: ' + dst + ')');
return;
}

if (data.ip === self._options.ip && data.port === self._options.port) {
if (!self._outstanding[key][dst]) {
self._outstanding[key][dst] = [];
}
self._outstanding[key][dst].push(message);
return;
}

var scheme = self._options.ssl.name ? 'https://' : 'http://';
restify.createJsonClient({
url: scheme + data.ip + ':' + data.port
}).post(self._options.path + [
key, src, self._clients[key][src].token, type.toLowerCase()
].join('/'), {
type: type, dst: dst, payload: message.payload
}, function(err, req, res, obj) {
if (err) {
util.prettyError('Failed to transfer message: ' + err);
}
});
});
};

exports.PeerServer = PeerServer;
4 changes: 3 additions & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ describe('PeerServer', function() {
});

it('should generate a 16-character ID', function() {
expect(p._generateClientId('anykey').length).to.be(16);
p._generateClientId('anykey', function(id, err) {
expect(id.length).to.be(16);
});
});
});
});