From ea4c394ca6626fa5618ae10792ad2e1654d33e12 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 14 Dec 2013 23:19:53 +0100 Subject: [PATCH] Removed persistent subscription while online. --- lib/persistence/levelup.js | 16 ++++++++++++---- lib/persistence/redis.js | 8 +++++++- test/persistence/abstract.js | 21 +++++++++++++++++++++ 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index bfc0cb7..9077789 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -141,11 +141,18 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { var that = this; this._clientSubscriptions.get(client.id, function(err, subscriptions) { that._clientSubscriptions.del(client.id, function() { + var levelkeys; + + levelkeys = Object.keys(subscriptions || {}).map(function(key) { + // TODO we need to remove these from the subLobber every time. + var levelKey = util.format("%s:%s", key, client.id); + that._subLobber.remove(key, levelKey); + return levelKey; + }); + if (subscriptions && client.clean) { that.streamOfflinePackets(client, nop, function() { - that._subscriptions.batch(Object.keys(subscriptions).map(function(key) { - var levelKey = util.format("%s:%s", key, client.id); - that._subLobber.remove(key, levelKey); + that._subscriptions.batch(levelkeys.map(function(levelKey) { return { key: levelKey, type: 'del' @@ -160,7 +167,8 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) { if (done) { done(null, subscriptions); } - } + return; + } }); }); }; diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 3387c98..d28c1f4 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -243,10 +243,16 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) { var key = "client:sub:" + client.id; var subscriptions; - var multi = this._client.multi() + var multi = this._client.multi(); + var that = this; multi.get(key, function(err, result) { + subscriptions = JSON.parse(result) || {}; + + Object.keys(subscriptions).forEach(function(sub) { + that._subLobber.remove(sub, client.id); + }); }); multi.del(key); diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 28de78f..a37c6ea 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -612,6 +612,27 @@ module.exports = function(create, buildOpts) { }); }); + it("should not store any offline packet for a client after lookup", function(done) { + var instance = this.instance; + var client = { + id: "my client id - 42", + clean: false, + logger: globalLogger, + subscriptions: { + hello: 1 + } + }; + + instance.lookupSubscriptions(client, function(err, results) { + instance.storeOfflinePacket(packet, function() { + instance.streamOfflinePackets(client, function(err, p) { + done(new Error("this should never be called")); + }); + done(); + }); + }); + }); + it("should not stream any offline packet to a clean client", function(done) { var instance = this.instance; var client = {