Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #80 from mcollina/fix-80
Browse files Browse the repository at this point in the history
Use persistence MongoDB, but there is a problem subscriptions
  • Loading branch information
mcollina committed Dec 15, 2013
2 parents dfd58ba + ea4c394 commit e72aa51
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 42 deletions.
1 change: 1 addition & 0 deletions demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ <h2>Offline Mode</h2>
<span class="p">});</span>

<span class="nx">client</span><span class="p">.</span><span class="nx">on</span><span class="p">(</span><span class="s2">&quot;message&quot;</span><span class="p">,</span> <span class="kd">function</span><span class="p">(</span><span class="nx">topic</span><span class="p">,</span> <span class="nx">payload</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">console</span><span class="p">.</span><span class="nx">log</span><span class="p">(</span><span class="nx">arguments</span><span class="p">);</span>
<span class="nx">alert</span><span class="p">([</span><span class="nx">topic</span><span class="p">,</span> <span class="nx">payload</span><span class="p">.</span><span class="nx">toString</span><span class="p">()].</span><span class="nx">join</span><span class="p">(</span><span class="s2">&quot;: &quot;</span><span class="p">));</span>

<span class="nx">setTimeout</span><span class="p">(</span><span class="nx">client</span><span class="p">.</span><span class="nx">end</span><span class="p">.</span><span class="nx">bind</span><span class="p">(</span><span class="nx">client</span><span class="p">),</span> <span class="mi">1000</span><span class="p">);</span>
Expand Down
1 change: 1 addition & 0 deletions demo/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ var client = mqtt.createClient({
});
client.on("message", function(topic, payload) {
console.log(arguments);
alert([topic, payload.toString()].join(": "));
setTimeout(client.end.bind(client), 1000);
Expand Down
52 changes: 30 additions & 22 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) {
qos: subscriptions[key].qos
};
var levelKey = util.format("%s:%s", key, client.id);
that._subLobber.add(key, levelKey);
if (that._subLobber.match(key).indexOf(levelKey) < 0) {
that._subLobber.add(key, levelKey);
}
that._subscriptions.put(levelKey, sub, ttl);
});
} else if (done) {
Expand All @@ -138,30 +140,36 @@ var nop = function() {};
LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
this._clientSubscriptions.get(client.id, function(err, subscriptions) {
if (subscriptions && client.clean) {
that._clientSubscriptions.del(client.id, function() {
that.streamOfflinePackets(client, nop, function() {
that._clientSubscriptions.del(client.id, function() {
var levelkeys;

levelkeys = Object.keys(subscriptions || {}).map(function(key) {
// TODO we need to remove these from the subLobber every time.
var levelKey = util.format("%s:%s", key, client.id);
that._subLobber.remove(key, levelKey);
return levelKey;
});

Object.keys(subscriptions).forEach(function(key) {
var levelKey = util.format("%s:%s", key, client.id);
that._subLobber.remove(levelKey);
that._subscriptions.del(levelKey);
if (subscriptions && client.clean) {
that.streamOfflinePackets(client, nop, function() {
that._subscriptions.batch(levelkeys.map(function(levelKey) {
return {
key: levelKey,
type: 'del'
};
}), function(err) {
done(err, {});
});

if (done) {
done(null, {});
}
});
});
} else {
if (!subscriptions) {
subscriptions = {};
}

if (done) {
done(null, subscriptions);
}
}
} else {
subscriptions = subscriptions || {};

if (done) {
done(null, subscriptions);
}
return;
}
});
});
};

Expand Down
47 changes: 30 additions & 17 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,17 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) {
});

async.each(subscriptions, function(key, cb) {
that._subscriptions.insert({
that._subscriptions.findAndModify({
client: client.id,
topic: key,
qos: client.subscriptions[key].qos,
added: new Date()
}, cb);
topic: key
}, [['date', -1]], {
$set: {
client: client.id,
topic: key,
qos: client.subscriptions[key].qos,
added: new Date()
}
}, { upsert: true}, cb);
}, done);
} else if (done) {
return done();
Expand All @@ -129,19 +134,27 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) {

MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
if (client.clean) {
async.parallel([
this._subscriptions.remove.bind(this._subscriptions, { client: client.id }),
this._packets.remove.bind(this._packets, { client: client.id }),
], function(err) {
done(err, {});
});
} else {
this._subscriptions.find({ client: client.id })
.toArray(function(err, subscriptions) {
this._subscriptions.find({ client: client.id })
.toArray(function(err, subscriptions) {

var toExecute = [
function removeSubscriptions(cb) {
that._subscriptions.remove({ client: client.id }, cb);
}
];

if (client.clean) {
subscriptions = [];
toExecute.unshift(function removePackets(cb) {
that._packets.remove({ client: client.id }, cb);
});
}

subscriptions = subscriptions || [];

async.parallel(toExecute, function(err) {
var now = Date.now();
done(err, (subscriptions || []).reduce(function(obj, sub) {
done(err, subscriptions.reduce(function(obj, sub) {
// mongodb TTL is not precise
if (sub.added.getTime() + that.options.ttl.subscriptions > now) {
obj[sub.topic] = {
Expand All @@ -151,7 +164,7 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
return obj;
}, {}));
});
}
});
};

MongoPersistence.prototype.storeRetained = function(packet, cb) {
Expand Down
25 changes: 22 additions & 3 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
.expire(clientSubKey, this.options.ttl.subscriptions / 1000);

Object.keys(subscriptions).forEach(function(e) {
that._subLobber.add(e, client.id);
if (that._subLobber.match(e).indexOf(client.id) < 0) {
that._subLobber.add(e, client.id);
}
});

op.exec(cb);
Expand Down Expand Up @@ -238,8 +240,25 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) {
return;
}

this._client.get("client:sub:" + client.id, function(err, result) {
cb(err, JSON.parse(result) || {});
var key = "client:sub:" + client.id;
var subscriptions;

var multi = this._client.multi();
var that = this;

multi.get(key, function(err, result) {

subscriptions = JSON.parse(result) || {};

Object.keys(subscriptions).forEach(function(sub) {
that._subLobber.remove(sub, client.id);
});
});

multi.del(key);

multi.exec(function(err) {
cb(err, subscriptions);
});
};

Expand Down
55 changes: 55 additions & 0 deletions test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,28 @@ module.exports = function(create, buildOpts) {
});
});

it("should remove the subscriptions after lookup", function(done) {
var instance = this.instance;
var client = {
id: "my client id - 42",
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
}
}
};

instance.storeSubscriptions(client, function() {
instance.lookupSubscriptions(client, function() {
instance.lookupSubscriptions(client, function(err, results) {
expect(results).to.eql({});
done();
});
});
});
});

it("should allow a clean client to connect", function(done) {
var instance = this.instance;
var client = {
Expand Down Expand Up @@ -518,6 +540,18 @@ module.exports = function(create, buildOpts) {
});
});

it("should support multiple subscription command", function(done) {
var instance = this.instance;
instance.storeSubscriptions(client, function() {
instance.storeOfflinePacket(packet, function() {
instance.streamOfflinePackets(client, function(err, p) {
expect(p).to.eql(packet);
done();
});
});
});
});

it("should delete the offline packets once streamed", function(done) {
var instance = this.instance;
instance.storeOfflinePacket(packet, function() {
Expand Down Expand Up @@ -578,6 +612,27 @@ module.exports = function(create, buildOpts) {
});
});

it("should not store any offline packet for a client after lookup", function(done) {
var instance = this.instance;
var client = {
id: "my client id - 42",
clean: false,
logger: globalLogger,
subscriptions: {
hello: 1
}
};

instance.lookupSubscriptions(client, function(err, results) {
instance.storeOfflinePacket(packet, function() {
instance.streamOfflinePackets(client, function(err, p) {
done(new Error("this should never be called"));
});
done();
});
});
});

it("should not stream any offline packet to a clean client", function(done) {
var instance = this.instance;
var client = {
Expand Down

0 comments on commit e72aa51

Please sign in to comment.