diff --git a/.travis.yml b/.travis.yml index 625d598..c254d3e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,4 +6,6 @@ services: - redis-server - mongodb script: + - npm run coverage +after_success: - npm run publish-coverage diff --git a/lib/stats.js b/lib/stats.js index 5f8c4df..554ed10 100644 --- a/lib/stats.js +++ b/lib/stats.js @@ -26,6 +26,7 @@ OTHER DEALINGS IN THE SOFTWARE. "use strict"; var moment = require("moment"); +var movingAverage = require("moving-average"); var version = "mosca " + require("../package").version; /** @@ -33,17 +34,39 @@ var version = "mosca " + require("../package").version; * * @api private */ -function Load() { - this.publishedMessages = 0; - this.connectedClients = 0; +function Load(minutes) { + this.maPublishedMessages = movingAverage(minutes * 60 * 1000); + this.maPublishedMessages.push(Date.now(), 0); + this.maConnectedClients = movingAverage(minutes * 60 * 1000); + this.maConnectedClients.push(Date.now(), 0); } +Object.defineProperties(Load.prototype, { + "publishedMessages": { + get: function () { + var value = this.maPublishedMessages.movingAverage(); + value = Math.round(value * 100) / 100; + return value; + } + }, + "connectedClients": { + get: function () { + var value = this.maConnectedClients.movingAverage(); + value = Math.round(value * 100) / 100; + return value; + } + } +}); + /** * A Stats object is used to keep track of the state of a mosca.Server * and can be wired() there. * * It provides the following stats: * - connectedClients: the number of connected clients at this point in time + * - publishedMessages: the number of publish messages received since the the start + * + * It also track the load at 1min, 5min, and 15min of the same events. * * @api public */ @@ -54,15 +77,13 @@ function Stats() { this.connectedClients = 0; this.publishedMessages = 0; + this.lastIntervalPublishedMessages = 0; this.started = new Date(); - this.current = { - m15: new Load(), - m1: new Load() - }; this.load = { - m15: new Load(), - m1: new Load() + m15: new Load(15), + m5: new Load(5), + m1: new Load(1) }; } @@ -75,8 +96,6 @@ function Stats() { function clientConnected() { /*jshint validthis:true */ this.stats.connectedClients++; - this.stats.current.m1.connectedClients++; - this.stats.current.m15.connectedClients++; } /** @@ -99,8 +118,7 @@ function clientDisconnected() { function published() { /*jshint validthis:true */ this.stats.publishedMessages++; - this.stats.current.m1.publishedMessages++; - this.stats.current.m15.publishedMessages++; + this.stats.lastIntervalPublishedMessages++; } /** @@ -137,27 +155,33 @@ Stats.prototype.wire = function wire(server) { var timer = setInterval(function() { var stats = server.stats; var mem = process.memoryUsage(); - stats.load.m1 = stats.current.m1; - stats.current.m1 = new Load(); - if (++count % 15 === 0) { - stats.load.m15 = stats.current.m15; - stats.current.m15 = new Load(); - count = 0; - } + var date = new Date(); + + stats.load.m1.maConnectedClients.push(date, stats.connectedClients); + stats.load.m5.maConnectedClients.push(date, stats.connectedClients); + stats.load.m15.maConnectedClients.push(date, stats.connectedClients); + + stats.load.m1.maPublishedMessages.push(date, stats.lastIntervalPublishedMessages); + stats.load.m5.maPublishedMessages.push(date, stats.lastIntervalPublishedMessages); + stats.load.m15.maPublishedMessages.push(date, stats.lastIntervalPublishedMessages); + stats.lastIntervalPublishedMessages = 0; doPublish("version", version); + doPublish("started_at", server.stats.started.toISOString()); doPublish("uptime", mom.from(Date.now(), true)); - doPublish("connectedClients", stats.connectedClients); - doPublish("publishedMessages", stats.publishedMessages); - doPublish("load/15m/connectedClients", stats.load.m15.connectedClients); - doPublish("load/15m/publishedMessages", stats.load.m15.publishedMessages); - doPublish("load/1m/connectedClients", stats.load.m1.connectedClients); - doPublish("load/1m/publishedMessages", stats.load.m1.publishedMessages); + doPublish("clients/connected", stats.connectedClients); + doPublish("publish/received", stats.publishedMessages); + doPublish("load/connections/15min", stats.load.m15.connectedClients); + doPublish("load/publish/received/15min", stats.load.m15.publishedMessages); + doPublish("load/connections/5min", stats.load.m5.connectedClients); + doPublish("load/publish/received/5min", stats.load.m5.publishedMessages); + doPublish("load/connections/1min", stats.load.m1.connectedClients); + doPublish("load/publish/received/1min", stats.load.m1.publishedMessages); doPublish("memory/rss", mem.rss); - doPublish("memory/heapUsed", mem.heapUsed); - doPublish("memory/heapTotal", mem.heapTotal); - }, 60 * 1000); + doPublish("memory/heap/current", mem.heapUsed); + doPublish("memory/heap/maximum", mem.heapTotal); + }, 10 * 1000); events.forEach(function(event) { server.on(event.name, event); diff --git a/package.json b/package.json index ec35984..39bb9ea 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "test": "mocha --recursive --bail --reporter spec test", "ci": "mocha --recursive --bail --watch test", "coverage": "rm -rf coverage; istanbul cover _mocha -- --recursive --reporter spec --bail", - "publish-coverage": "npm run coverage && (cat coverage/lcov.info | coveralls)", + "publish-coverage": "(cat coverage/lcov.info | coveralls)", "jshint-lib": "jshint lib/*.js", "jshint-test": "jshint test/*.js", "start": "./bin/mosca -v | bunyan" @@ -84,7 +84,8 @@ "json-buffer": "~2.0.7", "brfs": "0.0.8", "pre-commit": "0.0.4", - "moment": "~2.5.1" + "moment": "~2.5.1", + "moving-average": "0.0.4" }, "optionalDependencies": { "leveldown": "~0.10.0", diff --git a/test/abstract_server.js b/test/abstract_server.js index 8924ec1..662f526 100644 --- a/test/abstract_server.js +++ b/test/abstract_server.js @@ -1705,7 +1705,7 @@ module.exports = function(moscaSettings, createConnection) { it("should publish data each minute", function(done) { buildAndConnect(done, function(client1) { - var topic = "$SYS/" + instance.id + "/connectedClients"; + var topic = "$SYS/" + instance.id + "/clients/connected"; instance.ascoltatore.subscribe(topic, function(topic, value) { expect(value).to.eql("1"); client1.disconnect(); diff --git a/test/stats_spec.js b/test/stats_spec.js index ee2f608..6fc4dfe 100644 --- a/test/stats_spec.js +++ b/test/stats_spec.js @@ -4,6 +4,7 @@ describe("mosca.Stats", function() { var instance; var server; var clock; + var interval = 10; beforeEach(function() { clock = sinon.useFakeTimers(); @@ -46,18 +47,18 @@ describe("mosca.Stats", function() { expect(instance.connectedClients).to.eql(3); }); - it("should publish it every minute", function(done) { + it("should publish it every 10s", function(done) { server.emit("clientConnected"); server.emit("clientConnected"); server.on("testPublished", function(packet) { - if (packet.topic === "$SYS/42/connectedClients") { + if (packet.topic === "$SYS/42/clients/connected") { expect(packet.payload).to.eql("2"); done(); } }); - clock.tick(60 * 1000); + clock.tick(interval * 1000); }); }); @@ -78,29 +79,58 @@ describe("mosca.Stats", function() { expect(instance.publishedMessages).to.eql(2); }); - it("should publish it every minute", function(done) { + it("should publish it every 10s", function(done) { server.emit("published"); server.emit("published"); server.emit("published"); server.on("testPublished", function(packet) { - if (packet.topic === "$SYS/42/publishedMessages") { + if (packet.topic === "$SYS/42/publish/received") { expect(packet.payload).to.eql("3"); done(); } }); - clock.tick(60 * 1000); + clock.tick(interval * 1000); }); }); describe("tracking load", function() { + var toBeCleared; + + afterEach(function() { + if (toBeCleared) { + clearInterval(toBeCleared); + } + }); + var events = { published: "publishedMessages", clientConnected: "connectedClients" }; + var topics = { + published: "/load/publish/received/", + clientConnected: "/load/connections/", + }; + + var buildTimer = { + published: function() { + return setInterval(function() { + server.emit("published"); + server.emit("published"); + }, interval * 1000); + }, + clientConnected: function(minutes) { + server.emit("clientConnected"); + server.emit("clientConnected"); + + return setInterval(function() { + }, interval * 1000); + } + }; + Object.keys(events).forEach(function(event) { describe(event, function() { @@ -113,40 +143,62 @@ describe("mosca.Stats", function() { }); it("should cover the last 15 minutes", function() { - server.emit(event); - server.emit(event); + toBeCleared = buildTimer[event](15); clock.tick(15 * 60 * 1000 + 1); expect(instance.load.m15[events[event]]).to.eql(2); }); - it("should show only the data in the previous interval", function() { - server.emit(event); - server.emit(event); - clock.tick(16 * 60 * 1000); - clock.tick(16 * 60 * 1000); - expect(instance.load.m15[events[event]]).to.eql(0); + it("should publish it", function(done) { + toBeCleared = buildTimer[event](15); + + var count = 0; + + server.on("testPublished", function(packet) { + if (packet.topic === "$SYS/42" + topics[event] + "15min") { + count++; + + if (count % (15 * 6) === 0) { + expect(packet.payload).to.eql("2"); + done(); + } + } + }); + + clock.tick(60 * 1000 * 15); }); + }); + + describe("m5", function() { - it("should publish it every minute", function(done) { + it("should start from zero", function() { server.emit(event); server.emit(event); + expect(instance.load.m5[events[event]]).to.eql(0); + }); + + it("should cover the last 15 minutes", function() { + toBeCleared = buildTimer[event](5); + clock.tick(5 * 60 * 1000 + 1); + expect(instance.load.m5[events[event]]).to.eql(2); + }); + + it("should publish it", function(done) { + toBeCleared = buildTimer[event](5); var count = 0; server.on("testPublished", function(packet) { - if (packet.topic === "$SYS/42/load/15m/" + events[event]) { + if (packet.topic === "$SYS/42" + topics[event] + "5min") { count++; - if (++count % 15 === 0) { + if (count % (5 * 6) === 0) { expect(packet.payload).to.eql("2"); done(); - } else { - expect(packet.payload).to.eql("0"); } } }); - clock.tick(60 * 1000 * 15); + clock.tick(60 * 1000 * 5); }); }); @@ -159,28 +211,24 @@ describe("mosca.Stats", function() { }); it("should cover the last minute", function() { - server.emit(event); - server.emit(event); + toBeCleared = buildTimer[event](1); clock.tick(60 * 1000 + 1); expect(instance.load.m1[events[event]]).to.eql(2); }); - it("should show only the data in the previous interval", function() { - server.emit(event); - server.emit(event); - clock.tick(60 * 1000); - clock.tick(60 * 1000); - expect(instance.load.m1[events[event]]).to.eql(0); - }); + it("should publish it", function(done) { + toBeCleared = buildTimer[event](1); - it("should publish it every minute", function(done) { - server.emit(event); - server.emit(event); + var count = 0; server.on("testPublished", function(packet) { - if (packet.topic === "$SYS/42/load/1m/" + events[event]) { - expect(packet.payload).to.eql("2"); - done(); + if (packet.topic === "$SYS/42" + topics[event] + "1min") { + count++; + + if (count % 6 === 0) { + expect(packet.payload).to.eql("2"); + done(); + } } }); @@ -200,7 +248,7 @@ describe("mosca.Stats", function() { }); }); - it("should publish the version", function(done) { + it("should publish the version every 10s", function(done) { var version = require("../package").version; server.on("testPublished", function(packet) { if (packet.topic === "$SYS/42/version") { @@ -209,28 +257,38 @@ describe("mosca.Stats", function() { } }); - clock.tick(60 * 1000); + clock.tick(interval * 1000); + }); + + it("should publish the start time", function(done) { + server.on("testPublished", function(packet) { + if (packet.topic === "$SYS/42/started_at") { + expect(packet.payload).to.eql(instance.started.toISOString()); + done(); + } + }); + + clock.tick(interval * 1000); }); - it("should publish the uptime", function(done) { + it("should publish the uptime every 10s", function(done) { server.on("testPublished", function(packet) { if (packet.topic === "$SYS/42/uptime") { - expect(packet.payload).to.eql("a minute"); + expect(packet.payload).to.eql("a few seconds"); done(); } }); - clock.tick(60 * 1000); + clock.tick(interval * 1000); }); it("should publish the uptime (bis)", function(done) { - var count = 0; - clock.tick(60 * 1000 * 2); - server.on("testPublished", function(packet) { - if (packet.topic === "$SYS/42/uptime") { - expect(packet.payload).to.eql("3 minutes"); + server.on("testPublished", function func(packet) { + if (packet.topic === "$SYS/42/uptime" && + packet.payload === "3 minutes") { + server.removeListener("testPublished", func); done(); } }); @@ -250,17 +308,23 @@ describe("mosca.Stats", function() { stub.restore(); }); - ["rss", "heapTotal", "heapUsed"].forEach(function(event) { - it("should publish " + event + " every minute", function(done) { + var stats = { + rss: "rss", + heapTotal: "heap/maximum", + heapUsed: "heap/current" + }; + + Object.keys(stats).forEach(function(stat) { + it("should publish " + stat + " every minute", function(done) { server.on("testPublished", function(packet) { var mem = process.memoryUsage(); - if (packet.topic === "$SYS/42/memory/" + event) { - expect(packet.payload).to.eql("" + mem[event]); + if (packet.topic === "$SYS/42/memory/" + stats[stat]) { + expect(packet.payload).to.eql("" + mem[stat]); done(); } }); - clock.tick(60 * 1000); + clock.tick(interval * 1000); }); }); });