diff --git a/demo/index.html b/demo/index.html
index a00c1d1..2c68707 100644
--- a/demo/index.html
+++ b/demo/index.html
@@ -194,6 +194,7 @@
Offline Mode
});
client.on("message", function(topic, payload) {
+ console.log(arguments);
alert([topic, payload.toString()].join(": "));
setTimeout(client.end.bind(client), 1000);
diff --git a/demo/src/index.md b/demo/src/index.md
index 639e131..00e27ad 100644
--- a/demo/src/index.md
+++ b/demo/src/index.md
@@ -181,6 +181,7 @@ var client = mqtt.createClient({
});
client.on("message", function(topic, payload) {
+ console.log(arguments);
alert([topic, payload.toString()].join(": "));
setTimeout(client.end.bind(client), 1000);
diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js
index 629eeaa..9077789 100644
--- a/lib/persistence/levelup.js
+++ b/lib/persistence/levelup.js
@@ -126,7 +126,9 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) {
qos: subscriptions[key].qos
};
var levelKey = util.format("%s:%s", key, client.id);
- that._subLobber.add(key, levelKey);
+ if (that._subLobber.match(key).indexOf(levelKey) < 0) {
+ that._subLobber.add(key, levelKey);
+ }
that._subscriptions.put(levelKey, sub, ttl);
});
} else if (done) {
@@ -138,30 +140,36 @@ var nop = function() {};
LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
this._clientSubscriptions.get(client.id, function(err, subscriptions) {
- if (subscriptions && client.clean) {
- that._clientSubscriptions.del(client.id, function() {
- that.streamOfflinePackets(client, nop, function() {
+ that._clientSubscriptions.del(client.id, function() {
+ var levelkeys;
+
+ levelkeys = Object.keys(subscriptions || {}).map(function(key) {
+ // TODO we need to remove these from the subLobber every time.
+ var levelKey = util.format("%s:%s", key, client.id);
+ that._subLobber.remove(key, levelKey);
+ return levelKey;
+ });
- Object.keys(subscriptions).forEach(function(key) {
- var levelKey = util.format("%s:%s", key, client.id);
- that._subLobber.remove(levelKey);
- that._subscriptions.del(levelKey);
+ if (subscriptions && client.clean) {
+ that.streamOfflinePackets(client, nop, function() {
+ that._subscriptions.batch(levelkeys.map(function(levelKey) {
+ return {
+ key: levelKey,
+ type: 'del'
+ };
+ }), function(err) {
+ done(err, {});
});
-
- if (done) {
- done(null, {});
- }
});
- });
- } else {
- if (!subscriptions) {
- subscriptions = {};
- }
-
- if (done) {
- done(null, subscriptions);
- }
- }
+ } else {
+ subscriptions = subscriptions || {};
+
+ if (done) {
+ done(null, subscriptions);
+ }
+ return;
+ }
+ });
});
};
diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js
index 3e903cb..9694172 100644
--- a/lib/persistence/mongo.js
+++ b/lib/persistence/mongo.js
@@ -115,12 +115,17 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) {
});
async.each(subscriptions, function(key, cb) {
- that._subscriptions.insert({
+ that._subscriptions.findAndModify({
client: client.id,
- topic: key,
- qos: client.subscriptions[key].qos,
- added: new Date()
- }, cb);
+ topic: key
+ }, [['date', -1]], {
+ $set: {
+ client: client.id,
+ topic: key,
+ qos: client.subscriptions[key].qos,
+ added: new Date()
+ }
+ }, { upsert: true}, cb);
}, done);
} else if (done) {
return done();
@@ -129,19 +134,27 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) {
MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
- if (client.clean) {
- async.parallel([
- this._subscriptions.remove.bind(this._subscriptions, { client: client.id }),
- this._packets.remove.bind(this._packets, { client: client.id }),
- ], function(err) {
- done(err, {});
- });
- } else {
- this._subscriptions.find({ client: client.id })
- .toArray(function(err, subscriptions) {
+ this._subscriptions.find({ client: client.id })
+ .toArray(function(err, subscriptions) {
+ var toExecute = [
+ function removeSubscriptions(cb) {
+ that._subscriptions.remove({ client: client.id }, cb);
+ }
+ ];
+
+ if (client.clean) {
+ subscriptions = [];
+ toExecute.unshift(function removePackets(cb) {
+ that._packets.remove({ client: client.id }, cb);
+ });
+ }
+
+ subscriptions = subscriptions || [];
+
+ async.parallel(toExecute, function(err) {
var now = Date.now();
- done(err, (subscriptions || []).reduce(function(obj, sub) {
+ done(err, subscriptions.reduce(function(obj, sub) {
// mongodb TTL is not precise
if (sub.added.getTime() + that.options.ttl.subscriptions > now) {
obj[sub.topic] = {
@@ -151,7 +164,7 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
return obj;
}, {}));
});
- }
+ });
};
MongoPersistence.prototype.storeRetained = function(packet, cb) {
diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js
index 4bd44f1..d28c1f4 100644
--- a/lib/persistence/redis.js
+++ b/lib/persistence/redis.js
@@ -194,7 +194,9 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
.expire(clientSubKey, this.options.ttl.subscriptions / 1000);
Object.keys(subscriptions).forEach(function(e) {
- that._subLobber.add(e, client.id);
+ if (that._subLobber.match(e).indexOf(client.id) < 0) {
+ that._subLobber.add(e, client.id);
+ }
});
op.exec(cb);
@@ -238,8 +240,25 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) {
return;
}
- this._client.get("client:sub:" + client.id, function(err, result) {
- cb(err, JSON.parse(result) || {});
+ var key = "client:sub:" + client.id;
+ var subscriptions;
+
+ var multi = this._client.multi();
+ var that = this;
+
+ multi.get(key, function(err, result) {
+
+ subscriptions = JSON.parse(result) || {};
+
+ Object.keys(subscriptions).forEach(function(sub) {
+ that._subLobber.remove(sub, client.id);
+ });
+ });
+
+ multi.del(key);
+
+ multi.exec(function(err) {
+ cb(err, subscriptions);
});
};
diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js
index 7b02d1e..a37c6ea 100644
--- a/test/persistence/abstract.js
+++ b/test/persistence/abstract.js
@@ -310,6 +310,28 @@ module.exports = function(create, buildOpts) {
});
});
+ it("should remove the subscriptions after lookup", function(done) {
+ var instance = this.instance;
+ var client = {
+ id: "my client id - 42",
+ logger: globalLogger,
+ subscriptions: {
+ hello: {
+ qos: 1
+ }
+ }
+ };
+
+ instance.storeSubscriptions(client, function() {
+ instance.lookupSubscriptions(client, function() {
+ instance.lookupSubscriptions(client, function(err, results) {
+ expect(results).to.eql({});
+ done();
+ });
+ });
+ });
+ });
+
it("should allow a clean client to connect", function(done) {
var instance = this.instance;
var client = {
@@ -518,6 +540,18 @@ module.exports = function(create, buildOpts) {
});
});
+ it("should support multiple subscription command", function(done) {
+ var instance = this.instance;
+ instance.storeSubscriptions(client, function() {
+ instance.storeOfflinePacket(packet, function() {
+ instance.streamOfflinePackets(client, function(err, p) {
+ expect(p).to.eql(packet);
+ done();
+ });
+ });
+ });
+ });
+
it("should delete the offline packets once streamed", function(done) {
var instance = this.instance;
instance.storeOfflinePacket(packet, function() {
@@ -578,6 +612,27 @@ module.exports = function(create, buildOpts) {
});
});
+ it("should not store any offline packet for a client after lookup", function(done) {
+ var instance = this.instance;
+ var client = {
+ id: "my client id - 42",
+ clean: false,
+ logger: globalLogger,
+ subscriptions: {
+ hello: 1
+ }
+ };
+
+ instance.lookupSubscriptions(client, function(err, results) {
+ instance.storeOfflinePacket(packet, function() {
+ instance.streamOfflinePackets(client, function(err, p) {
+ done(new Error("this should never be called"));
+ });
+ done();
+ });
+ });
+ });
+
it("should not stream any offline packet to a clean client", function(done) {
var instance = this.instance;
var client = {