diff --git a/lib/client.js b/lib/client.js index 9d9da57..d762941 100644 --- a/lib/client.js +++ b/lib/client.js @@ -170,6 +170,30 @@ Client.prototype.setUpTimer = function() { */ Client.prototype._buildForward = function() { var that = this; + + function doForward(err, packet) { + if (err) { + return that.emit('error', err); + } + + that.server.authorizeForward(that, packet, function(err, authorized) { + if (err) { + return that.emit('error', err); + } + + if (!authorized) { + that.logger.warn(packet, "Unauthorized Forward"); + return; + } + + that.connection.publish(packet); + + if (packet.qos === 1) { + that.inflight[packet.messageId] = packet; + } + }); + } + this.forward = function(topic, payload, options, subTopic, qos, cb) { if (options._dedupId <= that._lastDedupId) { return; @@ -218,20 +242,6 @@ Client.prototype._buildForward = function() { ) ); - function doForward() { - that.server.authorizeForward(that, packet, qos, function(err, authorized) { - if (err || !authorized) { - return; - } - - that.connection.publish(packet); - - if (packet.qos === 1) { - that.inflight[packet.messageId] = packet; - } - }); - } - if (forward) { if (options._dedupId === undefined) { options._dedupId = that.server.nextDedupId(); @@ -239,9 +249,9 @@ Client.prototype._buildForward = function() { } if (qos && options.messageId) { - that.server.updateOfflinePacket(that, options.messageId, packet.messageId, doForward); + that.server.updateOfflinePacket(that, options.messageId, packet, doForward); } else { - doForward(); + doForward(null, packet); } } }; diff --git a/lib/persistence/abstract.js b/lib/persistence/abstract.js index bbceb40..47011ce 100644 --- a/lib/persistence/abstract.js +++ b/lib/persistence/abstract.js @@ -65,8 +65,8 @@ AbstractPersistence.prototype.wire = function(server) { that.deleteOfflinePacket(client, messageId, cb); }; - server.updateOfflinePacket = function(client, messageId, newMessageId, cb) { - that.updateOfflinePacket(client, messageId, newMessageId, cb); + server.updateOfflinePacket = function(client, messageId, packet, cb) { + that.updateOfflinePacket(client, messageId, packet, cb); }; server.forwardRetained = function(pattern, client, done) { diff --git a/lib/persistence/levelup.js b/lib/persistence/levelup.js index f7357d2..3ed778f 100644 --- a/lib/persistence/levelup.js +++ b/lib/persistence/levelup.js @@ -287,7 +287,7 @@ LevelUpPersistence.prototype.deleteOfflinePacket = function(client, messageId, d } }; -LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, done) { +LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, done) { var that = this; var prefix = util.format('%s:', client.id); var stream = that._offlinePackets.createReadStream({ @@ -303,11 +303,11 @@ LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, n found = true; - data.value.messageId = newMessageId; + data.value.messageId = packet.messageId; that._offlinePackets.put(data.key, data.value, function() { if (done) { - done(null, data.value); + done(null, packet); } }); }); @@ -316,7 +316,7 @@ LevelUpPersistence.prototype.updateOfflinePacket = function(client, messageId, n stream.on("error", done); stream.on("end", function() { if (!found) { - done(); + done(null, packet); } }); } diff --git a/lib/persistence/mongo.js b/lib/persistence/mongo.js index a54188e..403c1d9 100644 --- a/lib/persistence/mongo.js +++ b/lib/persistence/mongo.js @@ -309,14 +309,14 @@ MongoPersistence.prototype.deleteOfflinePacket = function(client, messageId, cb) this._packets.remove(toSearch, {w:1}, cb); }; -MongoPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, cb) { +MongoPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, cb) { this._packets.update({ client: client.id, 'packet.messageId': messageId }, { - $set: { 'packet.messageId': newMessageId } + $set: { 'packet.messageId': packet.messageId } }, {w:1}, function(err) { - cb(err); + cb(err, packet); }); }; diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index d2c239c..40ae09a 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -379,10 +379,10 @@ RedisPersistence.prototype.deleteOfflinePacket = function(client, messageId, don .exec(done); }; -RedisPersistence.prototype.updateOfflinePacket = function(client, messageId, newMessageId, done) { +RedisPersistence.prototype.updateOfflinePacket = function(client, messageId, packet, done) { var that = this; var oldPacketKey = "packets:" + client.id + ":" + messageId; - var newPacketKey = "packets:" + client.id + ":" + newMessageId; + var newPacketKey = "packets:" + client.id + ":" + packet.messageId; var listKey = "packets:" + client.id; that._client.multi() @@ -390,7 +390,9 @@ RedisPersistence.prototype.updateOfflinePacket = function(client, messageId, new .lrem(listKey, 1, oldPacketKey) .rpush(listKey, newPacketKey) .pexpire(listKey, this._listKeyTTL) - .exec(done); + .exec(function(err) { + done(err, packet); + }); }; RedisPersistence.prototype.close = function(done) { diff --git a/lib/server.js b/lib/server.js index 5258cb5..cb5b7d0 100644 --- a/lib/server.js +++ b/lib/server.js @@ -376,12 +376,11 @@ Server.prototype.authorizeSubscribe = function(client, topic, callback) { * Override at will * * @api public - * @param {Object} client The MQTTConnection that is a client - * @param {Object} packet The packet - * @param {Number} qos The QoS - * @param {Function} callback The callback to return the verdict + * @param {Object} client The MQTTConnection that is a client. + * @param {Object} packet The packet to be published. + * @param {Function} callback The callback to return the authorization flag. */ -Server.prototype.authorizeForward = function(client, packet, qos, callback) { +Server.prototype.authorizeForward = function(client, packet, callback) { callback(null, true); }; @@ -465,12 +464,12 @@ Server.prototype.forwardOfflinePackets = function(client, callback) { * * @param {MoscaClient} client * @param {Integer} originMessageId The original message id - * @param {Integer} newMessageId The new message id + * @param {Object} packet The new packet * @param {Function} callback */ -Server.prototype.updateOfflinePacket = function(client, originMessageId, newMessageId, callback) { +Server.prototype.updateOfflinePacket = function(client, originMessageId, packet, callback) { if (callback) { - callback(); + callback(null, packet); } }; diff --git a/test/abstract_server.js b/test/abstract_server.js index fc74d81..a438b58 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -1255,12 +1255,12 @@ module.exports = function(moscaSettings, createConnection) { }); }); - it("should not forward packet if authorizeForward returns false", function(done) { + it("should not forward packet if authorizeForward do not call the callback", function(done) { var d = donner(2, done); var that = this; - this.instance.authorizeForward = function(client, packet, qos, callback) { - callback(null, packet.topic != 'stop_forward'); + this.instance.authorizeForward = function(client, packet, callback) { + callback(null, packet.topic !== 'stop_forward'); }; buildAndConnect(d, buildOpts(), function(client1) { diff --git a/test/persistence/abstract.js b/test/persistence/abstract.js index 69b0b86..51198b9 100644 --- a/test/persistence/abstract.js +++ b/test/persistence/abstract.js @@ -656,7 +656,9 @@ module.exports = function(create, buildOpts) { var instance = this.instance; instance.storeOfflinePacket(packet, function() { instance.streamOfflinePackets(client, function(err, p3) { - instance.updateOfflinePacket(client, p3.messageId, 12345, function(err) { + var p4 = Object.create(p3); + p4.messageId = 12345; + instance.updateOfflinePacket(client, p3.messageId, p4, function(err) { instance.streamOfflinePackets(client, function(err, p2) { expect(p2.messageId).to.equal(12345); done();