diff --git a/README.md b/README.md index 8441d6b..b11a7c4 100644 --- a/README.md +++ b/README.md @@ -259,6 +259,24 @@ server.on('published', function(packet, client) { }); ``` +### mosca.Server#publish() + +The `publish()` function allows to programatically publish a value to +MQTT clients with full support of all distinctive MQTT features: +offline, quality of server, and retained messages. + +```javascript +var message = { + topic: '/hello/world', + payload: 'abcde', // or a Buffer + qos: 0, // 0, 1, or 2 + retain: false // or true +}; + +server.publish(message, function() { + console.log('done!'); +}); +``` ### How Mosca works diff --git a/lib/client.js b/lib/client.js index 2651571..64cc294 100644 --- a/lib/client.js +++ b/lib/client.js @@ -410,34 +410,13 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { return; } - var options = { - qos: packet.qos, - mosca: { - client: this, // the client object - packet: packet // the packet being sent - } - }; - - that.server.ascoltatore.publish( - packet.topic, - packet.payload, - options, - function() { - that.logger.info({ packet: packet }, "published packet"); - - that.server.storePacket(packet, function() { - that.server.published(packet, that, function() { - if (packet.qos === 1 && !(that._closed || that._closing)) { - that.connection.puback({ - messageId: packet.messageId - }); - } - - that.server.emit("published", packet, that); - }); + that.server.publish(packet, that, function() { + if (packet.qos === 1 && !(that._closed || that._closing)) { + that.connection.puback({ + messageId: packet.messageId }); } - ); + }); }; /** diff --git a/lib/server.js b/lib/server.js index c612163..daeb15c 100644 --- a/lib/server.js +++ b/lib/server.js @@ -28,6 +28,7 @@ var defaults = { } } }; +var nop = function() {}; /** * The Mosca Server is a very simple MQTT server that @@ -93,7 +94,7 @@ function Server(opts, callback) { this._dedupId = 0; this.clients = {}; - + if (this.opts.logger.childOf) { this.logger = this.opts.logger.childOf; delete this.opts.logger.childOf; @@ -220,6 +221,59 @@ module.exports = Server; Server.prototype = Object.create(EventEmitter.prototype); +Server.prototype.toString = function() { + return 'mosca.Server'; +}; + +/** + * Publishes a packet on the MQTT broker. + * + * @api public + * @param {Object} packet The MQTT packet, it should include the + * topic, payload, qos, and retain keys. + * @param {Object} client The client object (internal) + * @param {String} password The password + */ +Server.prototype.publish = function publish(packet, client, callback) { + + var that = this; + var logger = this.logger; + + if (typeof client === 'function') { + callback = client; + client = null; + } else if (client) { + logger = client.logger; + } + + if (!callback) { + callback = nop; + } + + var options = { + qos: packet.qos, + mosca: { + client: client, // the client object + packet: packet // the packet being sent + } + }; + + this.ascoltatore.publish( + packet.topic, + packet.payload, + options, + function() { + that.storePacket(packet, function() { + that.published(packet, that, function() { + logger.info({ packet: packet }, "published packet"); + that.emit("published", packet, that); + callback(); + }); + }); + } + ); +}; + /** * The function that will be used to authenticate users. * This default implementation authenticate everybody. diff --git a/test/abstract_server.js b/test/abstract_server.js index 4c78b65..290b936 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -1600,4 +1600,34 @@ module.exports = function(moscaSettings, createConnection) { buildTest("/test//topic", "/test//topic"); buildTest("/test/+/topic", "/test//topic", false); }); + + it("should allow plugin authors to publish", function(done) { + buildAndConnect(done, function(client) { + + var messageId = Math.floor(65535 * Math.random()); + var subscriptions = [{ + topic: "hello", + qos: 1 + }, { + topic: "hello2", + qos: 0 + } + ]; + + client.on("suback", function(packet) { + instance.publish({ topic: "hello", payload: "world" }); + }); + + client.on("publish", function(packet) { + expect(packet).to.have.property("topic", "hello"); + expect(packet).to.have.property("payload", "world"); + client.disconnect(); + }); + + client.subscribe({ + subscriptions: subscriptions, + messageId: messageId + }); + }); + }); };