diff --git a/lib/persistence/abstract.js b/lib/persistence/abstract.js index d353a12..c3a315d 100644 --- a/lib/persistence/abstract.js +++ b/lib/persistence/abstract.js @@ -115,7 +115,7 @@ AbstractPersistence.prototype.wire = function(server) { }); }; - server.forwardOfflinePackets = function forwardOfflinePackets(client, done) { + server.forwardOfflinePackets = function(client, done) { that.streamOfflinePackets(client, function(err, packet) { packet.offline = true; client.logger.debug({ packet: packet }, "Forwarding offline packet"); diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index 66332ca..264757a 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -261,6 +261,7 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) { that._offlinePackets.del(data.key, function() { count--; + // for testing if (ended && count === 0 && done) { done(); } @@ -273,6 +274,8 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) { stream.on("end", function() { that._cleanupStream(stream); ended = true; + + // for testing if (count === 0 && done) { done(); } @@ -282,6 +285,7 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) { that._cleanupStream(stream); }); + // for testing if (done) { stream.on("error", done); } diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index c20f867..adad091 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -99,7 +99,10 @@ function MongoPersistence(options, done) { function(cb) { db.collection("packets", function(err, coll) { that._packets = coll; - that._packets.ensureIndex("client", cb); + async.parallel([ + that._packets.ensureIndex.bind(that._packets, "client"), + that._packets.ensureIndex.bind(that._packets, { "added": 1 }, { expireAfterSeconds: Math.round(that.options.ttl.packets / 1000 )} ) + ], cb); }); }, function(cb) { @@ -281,13 +284,14 @@ MongoPersistence.prototype.storeOfflinePacket = function(packet, done) { MongoPersistence.prototype._storePacket = function(client, packet, cb) { var toStore = { client: client, - packet: packet + packet: packet, + added: new Date() }; this._packets.insert(toStore, {w:1}, cb); }; -MongoPersistence.prototype.streamOfflinePackets = function(client, cb) { +MongoPersistence.prototype.streamOfflinePackets = function(client, cb, done) { if (client.clean) { return; } @@ -295,12 +299,23 @@ MongoPersistence.prototype.streamOfflinePackets = function(client, cb) { var stream = this._packets.find({ client: client.id }).stream(); var that = this; + var now = Date.now(); + + // for testing + if(done) + stream.on("end", done); + stream.on("error", cb); stream.on("data", function(data) { - data.packet.payload = data.packet.payload.buffer; - cb(null, data.packet); + // mongodb TTL is not precise + // mongodb automaticly remove the packet + if (data.added.getTime() + that.options.ttl.packets > now) { + data.packet.payload = data.packet.payload.buffer; + cb(null, data.packet); + } }); + }; MongoPersistence.prototype.deleteOfflinePacket = function(client, messageId, cb) { diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index aa45fec..617e9b3 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -363,7 +363,7 @@ RedisPersistence.prototype._storePacket = function(client, packet, cb) { }; var keyRegexp = /^([^:]+):([^:]+):([^:]+)$/; -RedisPersistence.prototype.streamOfflinePackets = function(client, cb) { +RedisPersistence.prototype.streamOfflinePackets = function(client, cb, done) { if (this._explicitlyClosed()) { return cb && cb(new Error('Explicitly closed')); } @@ -377,6 +377,12 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb) { that._client.lrange(listKey, 0, 10000, function(err, results) { + var total = results.length; + + // for testing + if(done && total === 0) + done(); + function emit(key, result) { if (result) { var match = key.match(keyRegexp); @@ -390,10 +396,14 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb) { function fetch(multi, key) { multi.get(key, function(err, result) { + total --; // If we don't get result for given packet key. It means // that packet has expired. Just clean it from client packets key if(!result) { that._client.lrem(listKey, 0, key); + // for testing + if(done && total === 0) + done(); return; } emit(key, result); diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 5cbbef2..c1399be 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -600,10 +600,10 @@ module.exports = function(create, buildOpts) { }); it("should not stream any offline packet", function(done) { + // ensure persistence engine call 'done' this.instance.streamOfflinePackets(client, function(err, packet) { done(new Error("this should never be called")); - }); - done(); + }, done); }); it("should store and stream an offline packet", function(done) { @@ -1170,4 +1170,83 @@ module.exports = function(create, buildOpts) { }); + describe("offline packets - not send is expired", function() { + + var client = { + id : "my client id - 42", + clean : false, + subscriptions : { + "hello/#" : { + qos : 1 + } + } + }; + + it("do not send expires packages", function(done) { + var instance = this.instance; + + var packet = { + topic : "hello/42", + qos : 1, + retain : false, + payload : new Buffer("world"), + messageId : "42" + }; + + instance.storeSubscriptions(client, function() { + instance.storeOfflinePacket(packet, function() { + setTimeout(function() { + instance.streamOfflinePackets(client, function(err, p) { + done(new Error("this should never be called")); + }, done); + }, instance.options.ttl.packets + 500); + }); + }); + }); + + it("do not send expires packages - multiple", function(done) { + var instance = this.instance; + + var packet1 = { + topic : "hello/42", + qos : 1, + retain : false, + payload : new Buffer("hello"), + messageId : "42" + }; + + var packet2 = { + topic : "hello/42", + qos : 1, + retain : false, + payload : new Buffer("my"), + messageId : "43" + }; + + var packet3 = { + topic : "hello/42", + qos : 1, + retain : false, + payload : new Buffer("world"), + messageId : "44" + }; + + instance.storeSubscriptions(client, function() { + instance.storeOfflinePacket(packet1, function() { + instance.storeOfflinePacket(packet2, function() { + instance.storeOfflinePacket(packet3, function() { + setTimeout(function() { + instance.streamOfflinePackets(client, function(err, p) { + done(new Error("this should never be called")); + }, done); + }, instance.options.ttl.packets + 500); + }); + }); + }); + }); + }); + + }); + }; +