Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure that only registered cluster workers are asked to report metrics #182

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@

const cluster = require('cluster');
const Registry = require('./registry');
const Gauge = require('./gauge');
const util = require('./util');
const aggregators = require('./metricAggregators').aggregators;

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';
const REG_METRICS_WORKER = 'prom-client:registerMetricsWorker';

let registries = [Registry.globalRegistry];
let requestCtr = 0; // Concurrency control
let listenersAdded = false;
const coordinatedWorkers = new Set();
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
constructor() {
/**
Create an AggregatorRegistry instance. Accepts an optional `options` object.

Options are:
coordinated If false (default), request metrics from all cluster workers. If true, request metrics only from workers that have required prom-client.
* @param {object?} options object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only reason this needs to be an option because setting it to true makes the order of forking vs. new AggregatorRegistry() matter? I'd rather go for an implementation that isn't sensitive to that, e.g. the worker repeatedly attempts to register with the master until the master acks the registration.

Having a heterogeneous pool of workers (not all of them setting up prom-client) is unusual, but I think the behavior achieved when this is true is what should happen by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the only reason to make it an option is that it would break backwards compatibility.

Given that a metrics client should be as unobtrusive as possible, I would be wary of adding a prolonged discovery phase. I would certainly prefer in my consuming codebase to keep things simple and accept that ordering matters.

In the case of the homogenous cluster, the previous implementation cleanly sidesteps a lot of the issues with “garbage collecting” the workers and nobody else had complained so far :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, we will be breaking backwards compatibility pretty hard soon ish (see #177, #178 and #180), so don't let semver stop you from writing the code you'd like to write :D

*/
constructor({ coordinated } = {}) {
super();
this.coordinated = coordinated || false;
addListeners();
}

Expand All @@ -32,13 +43,15 @@ class AggregatorRegistry extends Registry {
* returned Promise resolve with the same value; either may be used.
* @param {Function?} callback (err, metrics) => any
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
* metrics.
*/
clusterMetrics(callback) {
const requestId = requestCtr++;

return new Promise((resolve, reject) => {
const nWorkers = Object.keys(cluster.workers).length;
const nWorkers = this.coordinated
? coordinatedWorkers.size
: Object.keys(cluster.workers).length;

function done(err, result) {
// Don't resolve/reject the promise if a callback is provided
Expand All @@ -56,6 +69,7 @@ class AggregatorRegistry extends Registry {

const request = {
responses: [],
workerCount: nWorkers,
pending: nWorkers,
done,
errorTimeout: setTimeout(() => {
Expand All @@ -71,7 +85,19 @@ class AggregatorRegistry extends Registry {
type: GET_METRICS_REQ,
requestId
};
for (const id in cluster.workers) cluster.workers[id].send(message);
const workers = this.coordinated
? coordinatedWorkers
: Object.keys(cluster.workers);
for (const id of workers) {
const worker = cluster.workers[id];
if (worker === undefined || worker.isDead() || !worker.isConnected()) {
request.pending--;
request.workerCount--;
coordinatedWorkers.delete(id); // Set is safe to mutate while iterating
} else {
worker.send(message);
}
}
});
}

Expand All @@ -81,7 +107,7 @@ class AggregatorRegistry extends Registry {
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr) {
Expand Down Expand Up @@ -122,7 +148,7 @@ class AggregatorRegistry extends Registry {
* Sets the registry or registries to be aggregated. Call from workers to
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} regs Registry or registries to be
* aggregated.
* aggregated.
* @return {void}
*/
static setRegistries(regs) {
Expand Down Expand Up @@ -167,9 +193,19 @@ function addListeners() {
if (request.failed) return; // Callback already run with Error.

const registry = AggregatorRegistry.aggregate(request.responses);
const g = new Gauge({
name: 'nodejs_prom_client_cluster_workers',
help: 'Number of connected cluster workers reporting to prometheus',
registers: [registry]
});
g.set(request.workerCount);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to move this new metric to a separate PR. (If @siimon and @SimenB are okay with it, it's a simple change that could land before this PR is ironed out.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, however this is very specific to the coordinated option - perhaps it should be exposed only when coordinated is true. Perhaps a similar metric could be added to the default metrics collected that counts all the cluster workers?

const promString = registry.metrics();
request.done(null, promString);
}
} else if (message.type === REG_METRICS_WORKER) {
//setup coordinated workers
const workerId = message.workerId;
coordinatedWorkers.add(workerId);
}
});
}
Expand All @@ -186,6 +222,11 @@ if (cluster.isWorker) {
});
}
});

process.send({
type: REG_METRICS_WORKER,
workerId: cluster.worker.id
});
}

module.exports = AggregatorRegistry;