Skip to content

Commit

Permalink
WIP: cluster metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zbjornson committed Aug 27, 2017
1 parent 8cf0bd9 commit d6f2cf6
Show file tree
Hide file tree
Showing 17 changed files with 642 additions and 24 deletions.
9 changes: 8 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"plugins": ["prettier"],
"extends": "eslint:recommended",
"env": {
"node": true
"node": true,
"es6": true
},
"parserOptions": {
"ecmaVersion": 2015
Expand Down Expand Up @@ -55,6 +56,12 @@
"no-shadow": "off",
"no-unused-expressions": "off"
}
},
{
"files": ["example/**/*.js"],
"rules": {
"no-console": "off"
}
}
]
}
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ A prometheus client for node.js that supports histogram, summaries, gauges and c

### Usage

See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the metrics() function in the registry.
See example folder for a sample usage. The library does not bundle any web framework, to expose the metrics just return the `metrics()` function in the registry.

#### Usage with Node.js's `cluster` module

Node.js's `cluster` module spawns multiple processes and hands off socket connections to those workers. Returning metrics from a worker's local registry will only reveal that individual worker's metrics, which is generally undesirable. To solve this, you can aggregate all of the workers' metrics in the master process. See example folder for a sample usage.

The default metrics use sensible aggregation methods.

Custom metrics are summed across workers by default. To use a different aggregation method, set the `aggregator` property in the metric config to one of 'sum', 'first', 'min', 'max', 'average' or 'omit'. (See `lib/metrics/version.js` for an example.)

Note: You must use the default global registry in cluster mode.

### API

Expand Down Expand Up @@ -248,7 +258,7 @@ You can prevent this by setting last parameter when creating the metric to `fals
Using non-global registries requires creating Registry instance and adding it inside `registers` inside the configuration object. Alternatively
you can pass an empty `registers` array and register it manually.

Registry has a merge function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.
Registry has a `merge` function that enables you to expose multiple registries on the same endpoint. If the same metric name exists in both registries, an error will be thrown.

```js
const client = require('prom-client');
Expand Down Expand Up @@ -278,6 +288,22 @@ If you need to get a reference to a previously registered metric, you can use `r
You can remove all metrics by calling `register.clear()`. You can also remove a single metric by calling
`register.removeSingleMetric(*name of metric*)`.

##### Cluster metrics

You can get aggregated metrics for all workers in a node.js cluster with `register.clusterMetrics()`. This method both returns a promise and accepts a callback, both of which resolve with a metrics string suitable for Prometheus to consume.

```js
register.clusterMetrics()
.then(metrics => { /* ... */ })
.catch(err => { /* ... */ });

// - or -

register.clusterMetrics((err, metrics) => {
// ...
});
```

#### Pushgateway

It is possible to push metrics via a [Pushgateway](https://github.com/prometheus/pushgateway).
Expand Down
27 changes: 27 additions & 0 deletions example/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const aggregatorRegistry = new require('../').AggregatorRegistry();

if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}

metricsServer.get('/cluster_metrics', (req, res) => {
aggregatorRegistry.clusterMetrics((err, metrics) => {
if (err) console.log(err);
res.set('Content-Type', aggregatorRegistry.contentType);
res.send(metrics);
});
});

metricsServer.listen(3001);
console.log(
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics'
);
} else {
require('./server.js');
}
9 changes: 8 additions & 1 deletion example/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const express = require('express');
const cluster = require('cluster');
const server = express();
const register = require('../').register;

Expand Down Expand Up @@ -48,6 +49,13 @@ setInterval(() => {
g.labels('post', '300').inc();
}, 100);

if (cluster.isWorker) {
// Expose some worker-specific metric as an example
setInterval(() => {
c.inc({ code: `worker_${cluster.worker.id}` });
}, 2000);
}

server.get('/metrics', (req, res) => {
res.set('Content-Type', register.contentType);
res.end(register.metrics());
Expand All @@ -61,6 +69,5 @@ server.get('/metrics/counter', (req, res) => {
//Enable collection of default metrics
require('../').collectDefaultMetrics();

//eslint-disable-next-line no-console
console.log('Server listening to 3000, metrics exposed on /metrics endpoint');
server.listen(3000);
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ exports.linearBuckets = require('./lib/bucketGenerators').linearBuckets;
exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBuckets;

exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.aggregators = require('./lib/metricAggregators').aggregators;
exports.AggregatorRegistry = require('./lib/cluster');
146 changes: 146 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict';

/**
* Extends the Registry class with a `clusterMetrics` method that returns
* aggregated metrics for all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
*/

const cluster = require('cluster');
const Registry = require('./registry');
const util = require('./util');
const aggregators = require('./metricAggregators').aggregators;

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';

let requestCtr = 0; // Concurrency control
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* @param {Function} callback? (err, metrics) => any
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
clusterMetrics(callback) {
const requestId = requestCtr++;

callback = callback || function() {};

return new Promise((resolve, reject) => {
const request = {
responses: [],
pending: Object.keys(cluster.workers).length,
callback,
resolve,
reject,
errorTimeout: setTimeout(() => {
request.failed = true;
const err = new Error('Operation timed out.');
request.callback(err);
reject(err);
}, 5000),
failed: false
};
requests.set(requestId, request);

const message = {
type: GET_METRICS_REQ,
requestId
};
for (const id in cluster.workers) cluster.workers[id].send(message);
});
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr) {
const aggregatedRegistry = new Registry();
const metricsByName = new util.Grouper();

// Gather by name
metricsArr.forEach(metrics => {
metrics.forEach(metric => {
metricsByName.add(metric.name, metric);
});
});

// Aggregate gathered metrics. Default to summation.
metricsByName.forEach(metrics => {
const aggregatorName = metrics[0].aggregator || 'sum';
const aggregatorFn = aggregators[aggregatorName];
if (typeof aggregatorFn !== 'function') {
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
}
const aggregatedMetric = aggregatorFn(metrics);
// NB: The 'omit' aggregator returns undefined.
if (aggregatedMetric) {
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric
},
aggregatedMetric
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}
}

module.exports = AggregatorRegistry;

if (cluster.isMaster) {
// Listen for worker responses to requests for local metrics
cluster.on('message', (worker, message) => {
if (arguments.length === 2) {
// pre-Node.js v6.0
message = worker;
worker = undefined;
}

if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);
request.responses.push(message.metrics);
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

if (request.failed) return; // Callback already run with Error.

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.callback(null, promString);
request.resolve(promString);
}
}
});
} else if (cluster.isWorker) {
// Respond to master's requests for worker's local metrics.
process.on('message', message => {
if (message.type === GET_METRICS_REQ) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
// TODO see if we can support the non-global registry also.
metrics: Registry.globalRegistry.getMetricsAsJSON()
});
}
});
}
4 changes: 3 additions & 1 deletion lib/counter.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Counter {
}

this.help = config.help;
this.aggregator = config.aggregator || 'sum';

config.registers.forEach(registryInstance =>
registryInstance.registerMetric(this)
Expand Down Expand Up @@ -99,7 +100,8 @@ class Counter {
help: this.help,
name: this.name,
type,
values: getProperties(this.hashMap)
values: getProperties(this.hashMap),
aggregator: this.aggregator
};
}

Expand Down
29 changes: 15 additions & 14 deletions lib/gauge.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Gauge {
this.hashMap = createValue({}, 0, {});
}
this.help = config.help;
this.aggregator = config.aggregator || 'sum';

config.registers.forEach(registryInstance =>
registryInstance.registerMetric(this)
Expand All @@ -90,24 +91,23 @@ class Gauge {
}

/**
* Increment a gauge value
* @param {object} labels - Object with labels where key is the label key and value is label value. Can only be one level deep
* @param {Number} value - Value to increment - if omitted, increment with 1
* @param {(Number|Date)} timestamp - Timestamp to set the gauge to
* @returns {void}
*/
* Increment a gauge value
* @param {object} labels - Object with labels where key is the label key and value is label value. Can only be one level deep
* @param {Number} value - Value to increment - if omitted, increment with 1
* @param {(Number|Date)} timestamp - Timestamp to set the gauge to
* @returns {void}
*/
inc(labels, value, timestamp) {
inc.call(this, labels)(value, timestamp);
}

/**
* Decrement a gauge value
* @param {object} labels - Object with labels where key is the label key and value is label value. Can only be one level deep
* @param {Number} value - Value to decrement - if omitted, decrement with 1
* @param {(Number|Date)} timestamp - Timestamp to set the gauge to
* @returns {void}
*/
* Decrement a gauge value
* @param {object} labels - Object with labels where key is the label key and value is label value. Can only be one level deep
* @param {Number} value - Value to decrement - if omitted, decrement with 1
* @param {(Number|Date)} timestamp - Timestamp to set the gauge to
* @returns {void}
*/
dec(labels, value, timestamp) {
dec.call(this, labels)(value, timestamp);
}
Expand Down Expand Up @@ -140,7 +140,8 @@ class Gauge {
help: this.help,
name: this.name,
type,
values: getProperties(this.hashMap)
values: getProperties(this.hashMap),
aggregator: this.aggregator
};
}

Expand Down
4 changes: 3 additions & 1 deletion lib/histogram.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class Histogram {

this.name = config.name;
this.help = config.help;
this.aggregate = config.aggregator || 'sum';

this.upperBounds = config.buckets;
this.bucketValues = this.upperBounds.reduce((acc, upperBound) => {
Expand Down Expand Up @@ -113,7 +114,8 @@ class Histogram {
name: this.name,
help: this.help,
type,
values
values,
aggregator: this.aggregator
};
}

Expand Down
Loading

0 comments on commit d6f2cf6

Please sign in to comment.