diff --git a/examples/Server_With_All_Interfaces-Settings.js b/examples/Server_With_All_Interfaces-Settings.js index 4ab7680..f6f5334 100644 --- a/examples/Server_With_All_Interfaces-Settings.js +++ b/examples/Server_With_All_Interfaces-Settings.js @@ -19,6 +19,7 @@ var moscaSetting = { { type: "https", port: 3001, bundle: true, credentials: { keyPath: SECURE_KEY, certPath: SECURE_CERT } } ], stats: false, + onQoS2publish: 'noack', // can set to 'disconnect', or to 'dropToQoS1' if using a client which will eat puback for QOS 2; e.g. mqtt.js logger: { name: 'MoscaServer', level: 'debug' }, diff --git a/lib/client.js b/lib/client.js index 6f6dbb7..39822e6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -535,6 +535,27 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { packet.payload = success; } + // Mosca does not support QoS2 + // if onQoS2publish === 'dropToQoS1', don't just ignore QoS2 message, puback it + // by converting internally to qos 1. + // this fools mqtt.js into not holding all messages forever + // if onQoS2publish === 'disconnect', then break the client connection if QoS2 + if (packet.qos === 2){ + switch(that.server.onQoS2publish){ + case 'dropToQoS1': + packet.qos = 1; + break; + case 'disconnect': + if (!this._closed && !this._closing) { + that.close(null, "qos2 caused disconnect"); + } + return; + break; + default: + break; + } + } + var dopuback = function() { if (packet.qos === 1 && !(that._closed || that._closing)) { that.connection.puback({ @@ -542,8 +563,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { }); } }; - - + // if success is passed as 'ignore', ack but don't publish. if (success !== 'ignore'){ // publish message @@ -552,6 +572,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) { // ignore but acknowledge message dopuback(); } + }; /** diff --git a/lib/options.js b/lib/options.js index 1737484..56c7a6e 100755 --- a/lib/options.js +++ b/lib/options.js @@ -62,7 +62,8 @@ function modernize(legacy) { "stats", "publishNewClient", "publishClientDisconnect", - "publishSubscriptions" + "publishSubscriptions", + "onQoS2publish" ]; // copy all conserved options @@ -252,7 +253,8 @@ function validate(opts, validationOptions) { 'stats': { type: 'boolean' }, 'publishNewClient': { type: 'boolean' }, 'publishClientDisconnect': { type: 'boolean' }, - 'publishSubscriptions': { type: 'boolean' } + 'publishSubscriptions': { type: 'boolean' }, + 'onQoS2publish': { type: 'string' }, } }); @@ -330,6 +332,7 @@ function defaultsLegacy() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, + onQoS2publish: 'noack', logger: { name: "mosca", level: "warn", @@ -366,6 +369,7 @@ function defaultsModern() { publishClientDisconnect: true, publishSubscriptions: true, maxInflightMessages: 1024, + onQoS2publish: 'noack', logger: { name: "mosca", level: "warn", diff --git a/lib/server.js b/lib/server.js index 4edee03..abc17a5 100755 --- a/lib/server.js +++ b/lib/server.js @@ -158,6 +158,9 @@ function Server(opts, callback) { var that = this; + // put QOS-2 spoofing as a variable direct on server + this.onQoS2publish = this.modernOpts.onQoS2publish; + // each Server has a dummy id for logging purposes this.id = this.modernOpts.id || shortid.generate(); diff --git a/test/abstract_server.js b/test/abstract_server.js index f32ad2c..470f0b4 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -956,6 +956,139 @@ module.exports = function(moscaSettings, createConnection) { }); }); + it("should by default not puback client publish to QOS 2", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("testQOS2"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "testQOS2", + payload: "publish expected", + qos: 2 + }); + + // allow 1 second to hear puback + timer = setTimeout(function(){ + client.disconnect(); + }, 1000); + + // default QOS 2 should NOT puback + client.on("puback", function() { + count++; + //expect(count).to.eql(1); + client.disconnect(); + }); + client.on("close", function() { + expect(count).to.eql(0); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + + + it("should optionally (onQoS2publish='dropToQoS1') puback client publish to QOS 2", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.onQoS2publish = 'dropToQoS1'; + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("testQOS2"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "testQOS2", + payload: "publish expected", + qos: 2 + }); + + // allow 1 second to hear puback + timer = setTimeout(function(){ + client.disconnect(); + }, 1000); + + // with maxqos=1, QOS 2 should puback + client.on("puback", function() { + count++; + expect(count).to.eql(1); + client.disconnect(); + }); + client.on("close", function() { + expect(count).to.eql(1); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + +it("should optionally (onQoS2publish='disconnect') disconnect client on publish of QOS2 message", function(done) { + var onPublishedCalled = false; + var clientId; + var count = 0; + var timer; + + instance.onQoS2publish = 'disconnect'; + instance.published = function(packet, serverClient, callback) { + onPublishedCalled = true; + expect(packet.topic).to.be.equal("should not have published"); + callback(); + }; + + buildAndConnect(done, function(client) { + clientId = client.opts.clientId; + + client.publish({ + messageId: 42, + topic: "QOS2Test", + payload: "some data to cause close", + qos: 2 + }); + + // if after 2 seconds, we've not closed + timer = setTimeout(function(){ + var test = false; + expect(count).to.eql(0); + expect(test).to.eql(true); + client.disconnect(); + }, 2000); + + // onQoS2publish = 'disconnect' should NOT puback + client.on("puback", function() { + expect(onPublishedCalled).to.eql(false); + count++; + expect(count).to.eql(0); + client.disconnect(); + }); + client.on("close", function() { + expect(onPublishedCalled).to.eql(false); + expect(count).to.eql(0); + client.disconnect(); + clearTimeout(timer); + }); + }); + }); + + + it("should emit an event when a new client is connected", function(done) { buildClient(done, function(client) {