From 5b41daf91ddb819180aa38695fb8a067e2b45d2c Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 13 Dec 2013 20:11:46 +0100 Subject: [PATCH 1/4] Fix #80 for Mongo and Level persistence, not Redis. --- demo/index.html | 1 + demo/src/index.md | 1 + lib/persistence/levelup.js | 6 ++++-- lib/persistence/mongo.js | 15 ++++++++++----- test/persistence/abstract.js | 12 ++++++++++++ 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/demo/index.html b/demo/index.html index a00c1d1..2c68707 100644 --- a/demo/index.html +++ b/demo/index.html @@ -194,6 +194,7 @@

Offline Mode

}); client.on("message", function(topic, payload) { + console.log(arguments); alert([topic, payload.toString()].join(": ")); setTimeout(client.end.bind(client), 1000); diff --git a/demo/src/index.md b/demo/src/index.md index 639e131..00e27ad 100644 --- a/demo/src/index.md +++ b/demo/src/index.md @@ -181,6 +181,7 @@ var client = mqtt.createClient({ }); client.on("message", function(topic, payload) { + console.log(arguments); alert([topic, payload.toString()].join(": ")); setTimeout(client.end.bind(client), 1000); diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 629eeaa..990027b 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -126,7 +126,9 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) { qos: subscriptions[key].qos }; var levelKey = util.format("%s:%s", key, client.id); - that._subLobber.add(key, levelKey); + if (that._subLobber.match(key).indexOf(levelKey) < 0) { + that._subLobber.add(key, levelKey); + } that._subscriptions.put(levelKey, sub, ttl); }); } else if (done) { @@ -144,7 +146,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { Object.keys(subscriptions).forEach(function(key) { var levelKey = util.format("%s:%s", key, client.id); - that._subLobber.remove(levelKey); + that._subLobber.remove(key, levelKey); that._subscriptions.del(levelKey); }); diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index 3e903cb..befa15d 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -115,12 +115,17 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) { }); async.each(subscriptions, function(key, cb) { - that._subscriptions.insert({ + that._subscriptions.findAndModify({ client: client.id, - topic: key, - qos: client.subscriptions[key].qos, - added: new Date() - }, cb); + topic: key + }, [['date', -1]], { + $set: { + client: client.id, + topic: key, + qos: client.subscriptions[key].qos, + added: new Date() + } + }, { upsert: true}, cb); }, done); } else if (done) { return done(); diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 7b02d1e..8e1148a 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -518,6 +518,18 @@ module.exports = function(create, buildOpts) { }); }); + it("should support multiple subscription command", function(done) { + var instance = this.instance; + instance.storeSubscriptions(client, function() { + instance.storeOfflinePacket(packet, function() { + instance.streamOfflinePackets(client, function(err, p) { + expect(p).to.eql(packet); + done(); + }); + }); + }); + }); + it("should delete the offline packets once streamed", function(done) { var instance = this.instance; instance.storeOfflinePacket(packet, function() { From 07ccdaec458303bed27a5135d039d7a6a5a4d869 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 13 Dec 2013 22:02:27 +0100 Subject: [PATCH 2/4] Fix #80 for Redis. --- lib/persistence/redis.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 4bd44f1..f1b0b3c 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -194,7 +194,9 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) { .expire(clientSubKey, this.options.ttl.subscriptions / 1000); Object.keys(subscriptions).forEach(function(e) { - that._subLobber.add(e, client.id); + if (that._subLobber.match(e).indexOf(client.id) < 0) { + that._subLobber.add(e, client.id); + } }); op.exec(cb); From 1cf36c59ea5a4d8048b55744ffe225989077701f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 14 Dec 2013 12:35:15 +0100 Subject: [PATCH 3/4] Removing subscriptions during lookupSubscriptions Otherwise we will queue offline messages when they are not due. --- lib/persistence/levelup.js | 32 +++++++++++++++----------------- lib/persistence/mongo.js | 32 ++++++++++++++++++++------------ lib/persistence/redis.js | 15 +++++++++++++-- test/persistence/abstract.js | 22 ++++++++++++++++++++++ 4 files changed, 70 insertions(+), 31 deletions(-) diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 990027b..bfc0cb7 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -140,30 +140,28 @@ var nop = function() {}; LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { var that = this; this._clientSubscriptions.get(client.id, function(err, subscriptions) { - if (subscriptions && client.clean) { - that._clientSubscriptions.del(client.id, function() { + that._clientSubscriptions.del(client.id, function() { + if (subscriptions && client.clean) { that.streamOfflinePackets(client, nop, function() { - - Object.keys(subscriptions).forEach(function(key) { + that._subscriptions.batch(Object.keys(subscriptions).map(function(key) { var levelKey = util.format("%s:%s", key, client.id); that._subLobber.remove(key, levelKey); - that._subscriptions.del(levelKey); + return { + key: levelKey, + type: 'del' + }; + }), function(err) { + done(err, {}); }); - - if (done) { - done(null, {}); - } }); - }); - } else { - if (!subscriptions) { - subscriptions = {}; - } + } else { + subscriptions = subscriptions || {}; - if (done) { - done(null, subscriptions); + if (done) { + done(null, subscriptions); + } } - } + }); }); }; diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index befa15d..9694172 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -134,19 +134,27 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) { MongoPersistence.prototype.lookupSubscriptions = function(client, done) { var that = this; - if (client.clean) { - async.parallel([ - this._subscriptions.remove.bind(this._subscriptions, { client: client.id }), - this._packets.remove.bind(this._packets, { client: client.id }), - ], function(err) { - done(err, {}); - }); - } else { - this._subscriptions.find({ client: client.id }) - .toArray(function(err, subscriptions) { + this._subscriptions.find({ client: client.id }) + .toArray(function(err, subscriptions) { + + var toExecute = [ + function removeSubscriptions(cb) { + that._subscriptions.remove({ client: client.id }, cb); + } + ]; + + if (client.clean) { + subscriptions = []; + toExecute.unshift(function removePackets(cb) { + that._packets.remove({ client: client.id }, cb); + }); + } + subscriptions = subscriptions || []; + + async.parallel(toExecute, function(err) { var now = Date.now(); - done(err, (subscriptions || []).reduce(function(obj, sub) { + done(err, subscriptions.reduce(function(obj, sub) { // mongodb TTL is not precise if (sub.added.getTime() + that.options.ttl.subscriptions > now) { obj[sub.topic] = { @@ -156,7 +164,7 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) { return obj; }, {})); }); - } + }); }; MongoPersistence.prototype.storeRetained = function(packet, cb) { diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index f1b0b3c..3387c98 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -240,8 +240,19 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) { return; } - this._client.get("client:sub:" + client.id, function(err, result) { - cb(err, JSON.parse(result) || {}); + var key = "client:sub:" + client.id; + var subscriptions; + + var multi = this._client.multi() + + multi.get(key, function(err, result) { + subscriptions = JSON.parse(result) || {}; + }); + + multi.del(key); + + multi.exec(function(err) { + cb(err, subscriptions); }); }; diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 8e1148a..28de78f 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -310,6 +310,28 @@ module.exports = function(create, buildOpts) { }); }); + it("should remove the subscriptions after lookup", function(done) { + var instance = this.instance; + var client = { + id: "my client id - 42", + logger: globalLogger, + subscriptions: { + hello: { + qos: 1 + } + } + }; + + instance.storeSubscriptions(client, function() { + instance.lookupSubscriptions(client, function() { + instance.lookupSubscriptions(client, function(err, results) { + expect(results).to.eql({}); + done(); + }); + }); + }); + }); + it("should allow a clean client to connect", function(done) { var instance = this.instance; var client = { From ea4c394ca6626fa5618ae10792ad2e1654d33e12 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 14 Dec 2013 23:19:53 +0100 Subject: [PATCH 4/4] Removed persistent subscription while online. --- lib/persistence/levelup.js | 16 ++++++++++++---- lib/persistence/redis.js | 8 +++++++- test/persistence/abstract.js | 21 +++++++++++++++++++++ 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index bfc0cb7..9077789 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -141,11 +141,18 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { var that = this; this._clientSubscriptions.get(client.id, function(err, subscriptions) { that._clientSubscriptions.del(client.id, function() { + var levelkeys; + + levelkeys = Object.keys(subscriptions || {}).map(function(key) { + // TODO we need to remove these from the subLobber every time. + var levelKey = util.format("%s:%s", key, client.id); + that._subLobber.remove(key, levelKey); + return levelKey; + }); + if (subscriptions && client.clean) { that.streamOfflinePackets(client, nop, function() { - that._subscriptions.batch(Object.keys(subscriptions).map(function(key) { - var levelKey = util.format("%s:%s", key, client.id); - that._subLobber.remove(key, levelKey); + that._subscriptions.batch(levelkeys.map(function(levelKey) { return { key: levelKey, type: 'del' @@ -160,7 +167,8 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { if (done) { done(null, subscriptions); } - } + return; + } }); }); }; diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 3387c98..d28c1f4 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -243,10 +243,16 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) { var key = "client:sub:" + client.id; var subscriptions; - var multi = this._client.multi() + var multi = this._client.multi(); + var that = this; multi.get(key, function(err, result) { + subscriptions = JSON.parse(result) || {}; + + Object.keys(subscriptions).forEach(function(sub) { + that._subLobber.remove(sub, client.id); + }); }); multi.del(key); diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 28de78f..a37c6ea 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -612,6 +612,27 @@ module.exports = function(create, buildOpts) { }); }); + it("should not store any offline packet for a client after lookup", function(done) { + var instance = this.instance; + var client = { + id: "my client id - 42", + clean: false, + logger: globalLogger, + subscriptions: { + hello: 1 + } + }; + + instance.lookupSubscriptions(client, function(err, results) { + instance.storeOfflinePacket(packet, function() { + instance.streamOfflinePackets(client, function(err, p) { + done(new Error("this should never be called")); + }); + done(); + }); + }); + }); + it("should not stream any offline packet to a clean client", function(done) { var instance = this.instance; var client = {