Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #236 from mazhack/master
Browse files Browse the repository at this point in the history
fix #235
  • Loading branch information
mcollina committed Apr 10, 2015
2 parents 15cbb49 + 5c2d231 commit 4e4a636
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ AbstractPersistence.prototype.wire = function(server) {
});
};

server.forwardOfflinePackets = function forwardOfflinePackets(client, done) {
server.forwardOfflinePackets = function(client, done) {
that.streamOfflinePackets(client, function(err, packet) {
packet.offline = true;
client.logger.debug({ packet: packet }, "Forwarding offline packet");
Expand Down
4 changes: 4 additions & 0 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
that._offlinePackets.del(data.key, function() {
count--;

// for testing
if (ended && count === 0 && done) {
done();
}
Expand All @@ -273,6 +274,8 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
stream.on("end", function() {
that._cleanupStream(stream);
ended = true;

// for testing
if (count === 0 && done) {
done();
}
Expand All @@ -282,6 +285,7 @@ LevelUpPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
that._cleanupStream(stream);
});

// for testing
if (done) {
stream.on("error", done);
}
Expand Down
25 changes: 20 additions & 5 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ function MongoPersistence(options, done) {
function(cb) {
db.collection("packets", function(err, coll) {
that._packets = coll;
that._packets.ensureIndex("client", cb);
async.parallel([
that._packets.ensureIndex.bind(that._packets, "client"),
that._packets.ensureIndex.bind(that._packets, { "added": 1 }, { expireAfterSeconds: Math.round(that.options.ttl.packets / 1000 )} )
], cb);
});
},
function(cb) {
Expand Down Expand Up @@ -281,26 +284,38 @@ MongoPersistence.prototype.storeOfflinePacket = function(packet, done) {
MongoPersistence.prototype._storePacket = function(client, packet, cb) {
var toStore = {
client: client,
packet: packet
packet: packet,
added: new Date()
};

this._packets.insert(toStore, {w:1}, cb);
};

MongoPersistence.prototype.streamOfflinePackets = function(client, cb) {
MongoPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
if (client.clean) {
return;
}

var stream = this._packets.find({ client: client.id }).stream();
var that = this;

var now = Date.now();

// for testing
if(done)
stream.on("end", done);

stream.on("error", cb);

stream.on("data", function(data) {
data.packet.payload = data.packet.payload.buffer;
cb(null, data.packet);
// mongodb TTL is not precise
// mongodb automaticly remove the packet
if (data.added.getTime() + that.options.ttl.packets > now) {
data.packet.payload = data.packet.payload.buffer;
cb(null, data.packet);
}
});

};

MongoPersistence.prototype.deleteOfflinePacket = function(client, messageId, cb) {
Expand Down
12 changes: 11 additions & 1 deletion lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ RedisPersistence.prototype._storePacket = function(client, packet, cb) {
};

var keyRegexp = /^([^:]+):([^:]+):([^:]+)$/;
RedisPersistence.prototype.streamOfflinePackets = function(client, cb) {
RedisPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
if (this._explicitlyClosed()) {
return cb && cb(new Error('Explicitly closed'));
}
Expand All @@ -377,6 +377,12 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb) {

that._client.lrange(listKey, 0, 10000, function(err, results) {

var total = results.length;

// for testing
if(done && total === 0)
done();

function emit(key, result) {
if (result) {
var match = key.match(keyRegexp);
Expand All @@ -390,10 +396,14 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb) {

function fetch(multi, key) {
multi.get(key, function(err, result) {
total --;
// If we don't get result for given packet key. It means
// that packet has expired. Just clean it from client packets key
if(!result) {
that._client.lrem(listKey, 0, key);
// for testing
if(done && total === 0)
done();
return;
}
emit(key, result);
Expand Down
83 changes: 81 additions & 2 deletions test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -600,10 +600,10 @@ module.exports = function(create, buildOpts) {
});

it("should not stream any offline packet", function(done) {
// ensure persistence engine call 'done'
this.instance.streamOfflinePackets(client, function(err, packet) {
done(new Error("this should never be called"));
});
done();
}, done);
});

it("should store and stream an offline packet", function(done) {
Expand Down Expand Up @@ -1170,4 +1170,83 @@ module.exports = function(create, buildOpts) {

});

describe("offline packets - not send is expired", function() {

var client = {
id : "my client id - 42",
clean : false,
subscriptions : {
"hello/#" : {
qos : 1
}
}
};

it("do not send expires packages", function(done) {
var instance = this.instance;

var packet = {
topic : "hello/42",
qos : 1,
retain : false,
payload : new Buffer("world"),
messageId : "42"
};

instance.storeSubscriptions(client, function() {
instance.storeOfflinePacket(packet, function() {
setTimeout(function() {
instance.streamOfflinePackets(client, function(err, p) {
done(new Error("this should never be called"));
}, done);
}, instance.options.ttl.packets + 500);
});
});
});

it("do not send expires packages - multiple", function(done) {
var instance = this.instance;

var packet1 = {
topic : "hello/42",
qos : 1,
retain : false,
payload : new Buffer("hello"),
messageId : "42"
};

var packet2 = {
topic : "hello/42",
qos : 1,
retain : false,
payload : new Buffer("my"),
messageId : "43"
};

var packet3 = {
topic : "hello/42",
qos : 1,
retain : false,
payload : new Buffer("world"),
messageId : "44"
};

instance.storeSubscriptions(client, function() {
instance.storeOfflinePacket(packet1, function() {
instance.storeOfflinePacket(packet2, function() {
instance.storeOfflinePacket(packet3, function() {
setTimeout(function() {
instance.streamOfflinePackets(client, function(err, p) {
done(new Error("this should never be called"));
}, done);
}, instance.options.ttl.packets + 500);
});
});
});
});
});

});

};

0 comments on commit 4e4a636

Please sign in to comment.