Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Refactoring and API improvements for #168.
Browse files Browse the repository at this point in the history
Removed a function allocation for each message forward. This should
reduce the memory footprint in high-load applications.

@mocheng: please review this commit.
  • Loading branch information
mcollina committed Jul 7, 2014
1 parent 652fa1d commit a2594f7
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 40 deletions.
42 changes: 26 additions & 16 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,30 @@ Client.prototype.setUpTimer = function() {
*/
Client.prototype._buildForward = function() {
var that = this;

function doForward(err, packet) {
if (err) {
return that.emit('error', err);
}

that.server.authorizeForward(that, packet, function(err, authorized) {
if (err) {
return that.emit('error', err);
}

if (!authorized) {
that.logger.warn(packet, "Unauthorized Forward");
return;
}

that.connection.publish(packet);

if (packet.qos === 1) {
that.inflight[packet.messageId] = packet;
}
});
}

this.forward = function(topic, payload, options, subTopic, qos, cb) {
if (options._dedupId <= that._lastDedupId) {
return;
Expand Down Expand Up @@ -218,30 +242,16 @@ Client.prototype._buildForward = function() {
)
);

function doForward() {
that.server.authorizeForward(that, packet, qos, function(err, authorized) {
if (err || !authorized) {
return;
}

that.connection.publish(packet);

if (packet.qos === 1) {
that.inflight[packet.messageId] = packet;
}
});
}

if (forward) {
if (options._dedupId === undefined) {
options._dedupId = that.server.nextDedupId();
that._lastDedupId = options._dedupId;
}

if (qos && options.messageId) {
that.server.updateOfflinePacket(that, options.messageId, packet.messageId, doForward);
that.server.updateOfflinePacket(that, options.messageId, packet, doForward);
} else {
doForward();
doForward(null, packet);
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions lib/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ AbstractPersistence.prototype.wire = function(server) {
that.deleteOfflinePacket(client, messageId, cb);
};

server.updateOfflinePacket = function(client, messageId, newMessageId, cb) {
that.updateOfflinePacket(client, messageId, newMessageId, cb);
server.updateOfflinePacket = function(client, messageId, packet, cb) {
that.updateOfflinePacket(client, messageId, packet, cb);
};

server.forwardRetained = function(pattern, client, done) {
Expand Down
8 changes: 4 additions & 4 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ LevelUpPersistence.prototype.deleteOfflinePacket = function(client, messageId, d
}
};

LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, done) {
LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, done) {
var that = this;
var prefix = util.format('%s:', client.id);
var stream = that._offlinePackets.createReadStream({
Expand All @@ -303,11 +303,11 @@ LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, n

found = true;

data.value.messageId = newMessageId;
data.value.messageId = packet.messageId;

that._offlinePackets.put(data.key, data.value, function() {
if (done) {
done(null, data.value);
done(null, packet);
}
});
});
Expand All @@ -316,7 +316,7 @@ LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, n
stream.on("error", done);
stream.on("end", function() {
if (!found) {
done();
done(null, packet);
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,14 @@ MongoPersistence.prototype.deleteOfflinePacket = function(client, messageId, cb)
this._packets.remove(toSearch, {w:1}, cb);
};

MongoPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, cb) {
MongoPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, cb) {
this._packets.update({
client: client.id,
'packet.messageId': messageId
}, {
$set: { 'packet.messageId': newMessageId }
$set: { 'packet.messageId': packet.messageId }
}, {w:1}, function(err) {
cb(err);
cb(err, packet);
});
};

Expand Down
8 changes: 5 additions & 3 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,18 +379,20 @@ RedisPersistence.prototype.deleteOfflinePacket = function(client, messageId, don
.exec(done);
};

RedisPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, done) {
RedisPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, done) {
var that = this;
var oldPacketKey = "packets:" + client.id + ":" + messageId;
var newPacketKey = "packets:" + client.id + ":" + newMessageId;
var newPacketKey = "packets:" + client.id + ":" + packet.messageId;
var listKey = "packets:" + client.id;

that._client.multi()
.rename(oldPacketKey, newPacketKey)
.lrem(listKey, 1, oldPacketKey)
.rpush(listKey, newPacketKey)
.pexpire(listKey, this._listKeyTTL)
.exec(done);
.exec(function(err) {
done(err, packet);
});
};

RedisPersistence.prototype.close = function(done) {
Expand Down
15 changes: 7 additions & 8 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,11 @@ Server.prototype.authorizeSubscribe = function(client, topic, callback) {
* Override at will
*
* @api public
* @param {Object} client The MQTTConnection that is a client
* @param {Object} packet The packet
* @param {Number} qos The QoS
* @param {Function} callback The callback to return the verdict
* @param {Object} client The MQTTConnection that is a client.
* @param {Object} packet The packet to be published.
* @param {Function} callback The callback to return the authorization flag.
*/
Server.prototype.authorizeForward = function(client, packet, qos, callback) {
Server.prototype.authorizeForward = function(client, packet, callback) {
callback(null, true);
};

Expand Down Expand Up @@ -465,12 +464,12 @@ Server.prototype.forwardOfflinePackets = function(client, callback) {
*
* @param {MoscaClient} client
* @param {Integer} originMessageId The original message id
* @param {Integer} newMessageId The new message id
* @param {Object} packet The new packet
* @param {Function} callback
*/
Server.prototype.updateOfflinePacket = function(client, originMessageId, newMessageId, callback) {
Server.prototype.updateOfflinePacket = function(client, originMessageId, packet, callback) {
if (callback) {
callback();
callback(null, packet);
}
};

Expand Down
6 changes: 3 additions & 3 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -1255,12 +1255,12 @@ module.exports = function(moscaSettings, createConnection) {
});
});

it("should not forward packet if authorizeForward returns false", function(done) {
it("should not forward packet if authorizeForward do not call the callback", function(done) {
var d = donner(2, done);
var that = this;

this.instance.authorizeForward = function(client, packet, qos, callback) {
callback(null, packet.topic != 'stop_forward');
this.instance.authorizeForward = function(client, packet, callback) {
callback(null, packet.topic !== 'stop_forward');
};

buildAndConnect(d, buildOpts(), function(client1) {
Expand Down
4 changes: 3 additions & 1 deletion test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ module.exports = function(create, buildOpts) {
var instance = this.instance;
instance.storeOfflinePacket(packet, function() {
instance.streamOfflinePackets(client, function(err, p3) {
instance.updateOfflinePacket(client, p3.messageId, 12345, function(err) {
var p4 = Object.create(p3);
p4.messageId = 12345;
instance.updateOfflinePacket(client, p3.messageId, p4, function(err) {
instance.streamOfflinePackets(client, function(err, p2) {
expect(p2.messageId).to.equal(12345);
done();
Expand Down

2 comments on commit a2594f7

@mocheng
Copy link
Contributor

@mocheng mocheng commented on a2594f7 Jul 7, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@mcollina
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I also removed the qos param from the authorizeForward method, but it's on the packet anyway.

Please sign in to comment.