Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #88 from mcollina/server.publish
Browse files Browse the repository at this point in the history
Added server.publish().
  • Loading branch information
mcollina committed Feb 6, 2014
2 parents 9ec3064 + 17b4c79 commit f4f337a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 27 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,24 @@ server.on('published', function(packet, client) {
});
```

### mosca.Server#publish()

The `publish()` function allows to programatically publish a value to
MQTT clients with full support of all distinctive MQTT features:
offline, quality of server, and retained messages.

```javascript
var message = {
topic: '/hello/world',
payload: 'abcde', // or a Buffer
qos: 0, // 0, 1, or 2
retain: false // or true
};

server.publish(message, function() {
console.log('done!');
});
```

### How Mosca works

Expand Down
31 changes: 5 additions & 26 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,34 +410,13 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) {
return;
}

var options = {
qos: packet.qos,
mosca: {
client: this, // the client object
packet: packet // the packet being sent
}
};

that.server.ascoltatore.publish(
packet.topic,
packet.payload,
options,
function() {
that.logger.info({ packet: packet }, "published packet");

that.server.storePacket(packet, function() {
that.server.published(packet, that, function() {
if (packet.qos === 1 && !(that._closed || that._closing)) {
that.connection.puback({
messageId: packet.messageId
});
}

that.server.emit("published", packet, that);
});
that.server.publish(packet, that, function() {
if (packet.qos === 1 && !(that._closed || that._closing)) {
that.connection.puback({
messageId: packet.messageId
});
}
);
});
};

/**
Expand Down
56 changes: 55 additions & 1 deletion lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var defaults = {
}
}
};
var nop = function() {};

/**
* The Mosca Server is a very simple MQTT server that
Expand Down Expand Up @@ -93,7 +94,7 @@ function Server(opts, callback) {

this._dedupId = 0;
this.clients = {};

if (this.opts.logger.childOf) {
this.logger = this.opts.logger.childOf;
delete this.opts.logger.childOf;
Expand Down Expand Up @@ -220,6 +221,59 @@ module.exports = Server;

Server.prototype = Object.create(EventEmitter.prototype);

Server.prototype.toString = function() {
return 'mosca.Server';
};

/**
* Publishes a packet on the MQTT broker.
*
* @api public
* @param {Object} packet The MQTT packet, it should include the
* topic, payload, qos, and retain keys.
* @param {Object} client The client object (internal)
* @param {String} password The password
*/
Server.prototype.publish = function publish(packet, client, callback) {

var that = this;
var logger = this.logger;

if (typeof client === 'function') {
callback = client;
client = null;
} else if (client) {
logger = client.logger;
}

if (!callback) {
callback = nop;
}

var options = {
qos: packet.qos,
mosca: {
client: client, // the client object
packet: packet // the packet being sent
}
};

this.ascoltatore.publish(
packet.topic,
packet.payload,
options,
function() {
that.storePacket(packet, function() {
that.published(packet, that, function() {
logger.info({ packet: packet }, "published packet");
that.emit("published", packet, that);
callback();
});
});
}
);
};

/**
* The function that will be used to authenticate users.
* This default implementation authenticate everybody.
Expand Down
30 changes: 30 additions & 0 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -1600,4 +1600,34 @@ module.exports = function(moscaSettings, createConnection) {
buildTest("/test//topic", "/test//topic");
buildTest("/test/+/topic", "/test//topic", false);
});

it("should allow plugin authors to publish", function(done) {
buildAndConnect(done, function(client) {

var messageId = Math.floor(65535 * Math.random());
var subscriptions = [{
topic: "hello",
qos: 1
}, {
topic: "hello2",
qos: 0
}
];

client.on("suback", function(packet) {
instance.publish({ topic: "hello", payload: "world" });
});

client.on("publish", function(packet) {
expect(packet).to.have.property("topic", "hello");
expect(packet).to.have.property("payload", "world");
client.disconnect();
});

client.subscribe({
subscriptions: subscriptions,
messageId: messageId
});
});
});
};

0 comments on commit f4f337a

Please sign in to comment.