Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #182 from mocheng/fix_92
Browse files Browse the repository at this point in the history
publish subscribes and connected-client
  • Loading branch information
mcollina committed Aug 8, 2014
2 parents ba46393 + bf01dd4 commit e8c3858
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
30 changes: 30 additions & 0 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var defaults = {
wildcardSome: '#'
},
stats: true,
publishNewClient: true,
maxInflightMessages: 1024,
logger: {
name: "mosca",
Expand Down Expand Up @@ -92,6 +93,7 @@ var nop = function() {};
* - `bundle`, serve the bundled mqtt client
* - `static`, serve a directory
* - `stats`, publish the stats every 10s (default false).
* - `publishNewClient`, publish message to topic "$SYS/{broker-id}/new/clients" when new client connects.
*
* Events:
* - `clientConnected`, when a client is connected;
Expand Down Expand Up @@ -242,9 +244,37 @@ function Server(opts, callback) {
]);

that.on("clientConnected", function(client) {
if(that.opts.publishNewClient) {
that.publish({
topic: "$SYS/" + that.id + "/new/clients",
payload: client.id
});
}

this.clients[client.id] = client;
});

that.ascoltatore.subscribe(
"$SYS/+/new/clients",
function(topic, payload) {
var serverId, clientId;

serverId = topic.split('/')[1];
clientId = payload;

if(that.clients[clientId] && serverId !== that.id) {
that.clients[clientId].close();
}
}
);

that.on("subscribed", function(topic, client) {
that.publish({
topic: "$SYS/" + that.id + "/new/subscribes",
payload: client.id
});
});

that.on("clientDisconnected", function(client) {
delete this.clients[client.id];
});
Expand Down
140 changes: 140 additions & 0 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = function(moscaSettings, createConnection) {

beforeEach(function(done) {
settings = moscaSettings();
settings.publishNewClient = false;
instance = new mosca.Server(settings, done);
this.instance = instance;
this.settings = settings;
Expand Down Expand Up @@ -58,6 +59,145 @@ module.exports = function(moscaSettings, createConnection) {
});
}

it("should publish connected client to '$SYS/{broker-id}/new/clients'", function(done) {
var connectedClient = null,
publishedClientId = null;

settings = moscaSettings();
settings.publishNewClient = true;

function verify() {
if (connectedClient && publishedClientId) {
expect(publishedClientId).to.be.equal(connectedClient.opts.clientId);
connectedClient.disconnect();
}
}

secondInstance = new mosca.Server(settings, function(err, server) {
server.on("published", function(packet, clientId) {
expect(packet.topic).to.be.equal("$SYS/" + secondInstance.id + "/new/clients");
publishedClientId = packet.payload;
verify();
});

buildAndConnect(done, function(client) {
connectedClient = client;
verify();
});
});
});

it("should publish each subscribe to '$SYS/{broker-id}/new/subscribes'", function(done) {
var d = donner(2, done),
connectedClient = null,
publishedClientId = null;

function verify() {
if (connectedClient && publishedClientId) {
expect(publishedClientId).to.be.equal(connectedClient.opts.clientId);
d();
}
}

instance.on("published", function(packet) {
expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes");
publishedClientId = packet.payload;
verify();
});

buildAndConnect(d, function(client) {
var messageId = Math.floor(65535 * Math.random());
var subscriptions = [{
topic: "hello",
qos: 1
}
];

connectedClient = client;

client.on("suback", function(packet) {
client.disconnect();
});

client.subscribe({
subscriptions: subscriptions,
messageId: messageId
});
});

});

describe("multi mosca servers", function() {
var serverOne = null,
serverTwo = null,
clientOpts = buildOpts();

afterEach(function(done) {
var instances = [];
instances.push(serverOne);
instances.push(serverTwo);

async.each(instances, function(instance, cb) {
if (instance) {
instance.close(cb);
} else {
cb();
}
}, done);
});

it("should disconnect client connected to another broker", function(done) {
var settingsOne = moscaSettings(),
settingsTwo = moscaSettings();

if (!settings.backend || !settings.backend.type) {
// only need to validate cases with backend
return done();
}

clientOpts.clientId = '123456';
clientOpts.keepalive = 0;

settingsOne.publishNewClient = settingsTwo.publishNewClient = true;

settingsOne.backend = settingsTwo.backend = settings.backend;

async.series([
function(cb) {
serverOne = new mosca.Server(settingsOne, function(err, server) {
serverOne.on('clientDisconnected', function(serverClient) {
expect(serverClient).not.to.be.equal(undefined);
done();
});
cb();
});
},
function(cb) {
serverTwo = new mosca.Server(settingsTwo, function(err, server) {
cb();
});
},
function(cb) {
var clientOne = createConnection(settingsOne.port, settingsOne.host);
clientOne.connect(clientOpts);

clientOne.on("connected", function() {
cb();
});
},
function(cb) {
var clientTwo = createConnection(settingsTwo.port, settingsTwo.host);
clientTwo.connect(clientOpts);

clientTwo.on("connected", function() {
cb();
});
}
]);
});

});

it("should pass itself in the callback", function(done) {
secondInstance = new mosca.Server(moscaSettings(), function(err, server) {
expect(server === secondInstance).to.be.true;
Expand Down
1 change: 1 addition & 0 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var moscaSettings = function() {
return {
port: nextPort(),
stats: false,
publishNewClient: false,
persistence: {
factory: mosca.persistence.Memory
},
Expand Down
1 change: 1 addition & 0 deletions test/server_mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ describe("mosca.Server with mongo persistence", function() {
return {
port: nextPort(),
stats: false,
publishNewClient: false,
logger: {
childOf: globalLogger,
level: 60
Expand Down
1 change: 1 addition & 0 deletions test/server_redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ describe("mosca.Server with redis persistence", function() {
return {
port: nextPort(),
stats: false,
publishNewClient: false,
logger: {
childOf: globalLogger,
level: 60
Expand Down

0 comments on commit e8c3858

Please sign in to comment.