Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #77 from mcollina/v0.14.3-wip
Browse files Browse the repository at this point in the history
V0.14.3 wip
  • Loading branch information
mcollina committed Dec 11, 2013
2 parents 3343b1d + ad6b546 commit 9d4ce43
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 20 deletions.
13 changes: 11 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,20 @@ Client.prototype.close = function(callback) {
}
};

that.connection.stream.on('end' , function(){
that._streamClosedRequiresCleanup = true;
});

that._closing = true;

async.parallel(Object.keys(that.subscriptions).map(that.unsubscribeMapTo.bind(that)), function() {
that.server.persistClient(that);
that.connection.stream.on('end', cleanup);
that.connection.stream.end();
if(that._streamClosedRequiresCleanup){
cleanup();
} else {
that.connection.stream.on('end', cleanup);
that.connection.stream.end();
}
});
};

Expand Down
20 changes: 15 additions & 5 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,24 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
};

MongoPersistence.prototype.storeRetained = function(packet, cb) {
var that = this;
this._retained.remove({ topic: packet.topic }, function(err) {
that._retained.insert(packet, function(err) {
if (cb) {

this._retained.findAndModify(
{ topic: packet.topic },
[],
packet,
{
upsert: true,
new: true
},
function(err, result){
if(!err) {
packet._id = result._id;
}
if(cb) {
return cb(err);
}
});
});

};

MongoPersistence.prototype.lookupRetained = function(pattern, cb) {
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@
"chai": "~1.8.0",
"sinon": "~1.7.3",
"sinon-chai": "~2.1.2",
"async_bench": "0.2.1",
"underscore": "~1.4.4",
"dox-foundation": "~0.5.4",
"jshint": "~2.3.0",
"microtime": "~0.5.0",
"js-beautify": "~0.4.2",
"tmp": "0.0.22",
"supertest": "~0.8.1",
"coveralls": "~2.5.0",
"istanbul": "~0.1.45"
"istanbul": "~0.1.45",
"async_bench": "~0.3.0"
},
"dependencies": {
"mqtt": "~0.3.7",
Expand Down Expand Up @@ -85,6 +86,6 @@
"zmq": "~2.4.0",
"amqp": "~0.1.8",
"redis": "~0.8.2",
"mongodb": "~1.3.20"
"mongodb": "~1.3.19"
}
}
110 changes: 106 additions & 4 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -1271,10 +1271,9 @@ module.exports = function(moscaSettings, createConnection) {

client.on('connack', function(packet) {
var subscriptions = [{
topic: "hello",
qos: 0
}
];
topic: "hello",
qos: 0
}];

client.subscribe({
subscriptions: subscriptions,
Expand All @@ -1293,6 +1292,109 @@ module.exports = function(moscaSettings, createConnection) {
], done);
});

it("should return only a single retained message", function(done) {

var pers = new mosca.persistence.Memory();

pers.wire(instance);

async.waterfall([

function(cb) {
var client = createConnection(settings.port, settings.host);
client.name = "Phase 1";
var defaultMessage = {
topic: "hello",
qos: 0,
payload: null,
messageId: null,
retain: true
};

client.on("connected", function() {
var opts = buildOpts();
opts.clean = true;

var totalMessages = 10;
var publishCount = 0;

client.connect(opts);

client.on('publish', function(packet){
publishCount++;
if(publishCount == totalMessages) {
client.stream.end();
cb();
}

});

client.on('connack', function(packet) {

var subscriptions = [{
topic: "hello",
qos: 0
}];

client.subscribe({
subscriptions: subscriptions,
messageId: 20
});

for(var c = 1 ; c <= 10 ; c++)
{
defaultMessage.payload = (c == totalMessages) ? new Buffer("Final Message") : new Buffer("Message " + c);
defaultMessage.messageId = 40 + c;
client.publish(defaultMessage);
}

});
});
},

function(cb) {
var client = createConnection(settings.port, settings.host);

var retainedReceivedCount = 0;

client.on("connected", function() {
var opts = buildOpts();
opts.clean = true;

client.connect(opts);

client.on('connack', function(packet) {
var subscriptions = [{
topic: "hello",
qos: 0
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 20
});
});

var handleTimeout = function() {
expect(retainedReceivedCount).to.be.equal(1);
client.stream.end();
cb();
};

var timeout;

client.on("publish", function(packet) {
clearInterval(timeout);
timeout = setTimeout(handleTimeout, 100);
retainedReceivedCount ++;
});

});
}
], done);
});

it("should support unclean clients", function(done) {
var pers = new mosca.persistence.Memory();
var opts = buildOpts();
Expand Down
8 changes: 6 additions & 2 deletions test/cli_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var async = require("async");
var tmp = require('tmp');
var fs = require("fs");
var mqtt = require("mqtt");
var os = require("os");

var SECURE_KEY = __dirname + '/secure/tls-key.pem';
var SECURE_CERT = __dirname + '/secure/tls-cert.pem';
Expand Down Expand Up @@ -318,12 +319,15 @@ describe("mosca.cli", function() {
});
});

it("should reload the config using if killed with SIGHUP", function(done) {
it("should reload the current config if killed with SIGHUP on a Linux-based OS", function(done) {

if(os.platform() === "win32") return done();

args.push("adduser");
args.push("myuser");
args.push("mypass");
args.push("--credentials");

var cloned = null;

async.waterfall([
Expand Down
33 changes: 33 additions & 0 deletions test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,39 @@ module.exports = function(create, buildOpts) {
], done);
});

it("should match and load a single retained message", function(done) {

var packetMessageId = 0;

var getPacket = function(){

packetMessageId++;

return {
topic: "hello",
qos: 0,
payload: new Buffer("world"),
messageId: packetMessageId,
retain: true
};
};

var instance = this.instance;

async.parallel([
function(cb) { instance.storeRetained(getPacket(), cb); },
function(cb) { instance.storeRetained(getPacket(), cb); },
function(cb) { instance.storeRetained(getPacket(), cb); },
function(cb) { instance.storeRetained(getPacket(), cb); },
function(cb) { instance.storeRetained(getPacket(), cb); }
], function(err, results) {
instance.lookupRetained("hello", function(err, results) {
expect(results.length).to.be.eql(1);
done();
});
});
});

it("should overwrite a retained message", function(done) {
var packet = {
topic: "hello",
Expand Down
1 change: 0 additions & 1 deletion test/persistence/mongo_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var abstract = require("./abstract");
var Mongo = require("../../").persistence.Mongo;
var redis = require("redis");
var MongoClient = require('mongodb').MongoClient;
var async = require("async");

Expand Down
2 changes: 1 addition & 1 deletion test/static/test
Original file line number Diff line number Diff line change
@@ -1 +1 @@
42
42
2 changes: 1 addition & 1 deletion test/websocket_secure_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ describe("mosca.Server - Secure Websocket", function() {
var curPort = nextPort() - 1;
var req = request("https://localhost:" + curPort);

req.get('/test').expect(200, "42\n").end(done);
req.get('/test').expect(200, "42").end(done);
});

it("should serve a browserify bundle", function(done) {
Expand Down
2 changes: 1 addition & 1 deletion test/websocket_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("mosca.Server - Websocket", function() {
var curPort = nextPort() - 1;
var req = request("http://localhost:" + curPort);

req.get('/test').expect(200, "42\n").end(done);
req.get('/test').expect(200, "42").end(done);
});

it("should serve a browserify bundle", function(done) {
Expand Down

0 comments on commit 9d4ce43

Please sign in to comment.