Skip to content

Commit

Permalink
feat: support aggregating metrics across workers in a Node.js worker_…
Browse files Browse the repository at this point in the history
…threads
  • Loading branch information
Rafal Augustyniak committed Mar 14, 2023
1 parent 2c37e80 commit 7e1fc1f
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ project adheres to [Semantic Versioning](http://semver.org/).
### Added

- Support for OpenMetrics and Exemplars
- Support aggregating metrics across workers in a Node.js `worker_threads`

## [14.2.0] - 2023-03-06

Expand Down
27 changes: 27 additions & 0 deletions example/cluster-with-worker-threads/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const cluster = require('cluster');
const server = require('./server');

const AggregatorRegistry = require('../..').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < 2; i++) {
cluster.fork();
}

cluster.on('exit', worker => {
console.log(`worker ${worker.process.pid} died`);
console.log("Let's fork another worker!");
cluster.fork();
});
} else {
server({
metrics: {
contentType: aggregatorRegistry.contentType,
},
});
}
40 changes: 40 additions & 0 deletions example/cluster-with-worker-threads/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const express = require('express');
const { Counter } = require('../..');
const AggregatorRegistry = require('../..').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();
const worker = require('./worker');
const PORT = 3000;

const http_request_total = new Counter({
name: 'http_request_total',
help: 'request count | clusterId="20212" statusCode="2xx|4xx|5xx"',
labelNames: ['clusterId', 'statusCode'],
});

module.exports = (options = {}) => {
const {
metrics: { contentType = '' },
} = options;

const app = express();

app.get('/', async (req, res) => {
http_request_total.inc({ clusterId: process.pid, statusCode: 200 });
const result = await worker();

res.send(result);
});

app.get('/metrics', async (req, res) => {
const metrics = await aggregatorRegistry.workersMetrics();

res.set('Content-Type', contentType);
res.send(metrics);
});

app.listen(PORT, () => {
console.log(`cluster: #${process.pid} - listening on port ${PORT}`);
});
};
29 changes: 29 additions & 0 deletions example/cluster-with-worker-threads/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const { Counter } = require('../..');
const {
Worker,
isMainThread,
parentPort,
threadId,
} = require('worker_threads');

if (isMainThread) {
module.exports = () =>
new Promise((resolve, reject) => {
const worker = new Worker(__filename);

worker.on('message', resolve);
worker.on('error', reject);
});
} else {
const worker_invocation_total = new Counter({
name: 'worker_invocation_total',
help: 'worker invocation count | threadId="20212"',
labelNames: ['threadId'],
});

worker_invocation_total.inc({ threadId });

parentPort.postMessage(`result: ${Math.random()}`);
}
31 changes: 0 additions & 31 deletions example/cluster.js

This file was deleted.

27 changes: 27 additions & 0 deletions example/cluster/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const cluster = require('cluster');
const server = require('./server');

const AggregatorRegistry = require('../..').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < 2; i++) {
cluster.fork();
}

cluster.on('exit', worker => {
console.log(`worker ${worker.process.pid} died`);
console.log("Let's fork another worker!");
cluster.fork();
});
} else {
server({
metrics: {
contentType: aggregatorRegistry.contentType,
},
});
}
38 changes: 38 additions & 0 deletions example/cluster/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';

const express = require('express');
const { Counter } = require('../..');
const AggregatorRegistry = require('../..').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();
const PORT = 3000;

const http_request_total = new Counter({
name: 'http_request_total',
help: 'request count | clusterId="20212" statusCode="2xx|4xx|5xx"',
labelNames: ['clusterId', 'statusCode'],
});

module.exports = (options = {}) => {
const {
metrics: { contentType = '' },
} = options;

const app = express();

app.get('/', (req, res) => {
http_request_total.inc({ clusterId: process.pid, statusCode: 200 });

res.send('OK');
});

app.get('/metrics', async (req, res) => {
const metrics = await aggregatorRegistry.clusterMetrics();

res.set('Content-Type', contentType);
res.send(metrics);
});

app.listen(PORT, () => {
console.log(`cluster: #${process.pid} - listening on port ${PORT}`);
});
};
8 changes: 0 additions & 8 deletions example/server.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const express = require('express');
const cluster = require('cluster');
const server = express();
const register = require('../').register;

Expand Down Expand Up @@ -68,13 +67,6 @@ setInterval(() => {
g.labels('post', '300').inc();
}, 100);

if (cluster.isWorker) {
// Expose some worker-specific metric as an example
setInterval(() => {
c.inc({ code: `worker_${cluster.worker.id}` });
}, 2000);
}

const t = [];
setInterval(() => {
for (let i = 0; i < 100; i++) {
Expand Down
34 changes: 34 additions & 0 deletions example/worker-threads/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const express = require('express');
const { Counter } = require('../..');
const AggregatorRegistry = require('../..').AggregatorRegistry;
const aggregatorRegistry = new AggregatorRegistry();
const worker = require('./worker');
const PORT = 3000;

const http_request_total = new Counter({
name: 'http_request_total',
help: 'request count | statusCode="2xx|4xx|5xx"',
labelNames: ['statusCode'],
});

const app = express();

app.get('/', async (req, res) => {
http_request_total.inc({ statusCode: 200 });
const result = await worker();

res.send(result);
});

app.get('/metrics', async (req, res) => {
const metrics = await aggregatorRegistry.workersMetrics();

res.set('Content-Type', aggregatorRegistry.contentType);
res.send(metrics);
});

app.listen(PORT, () => {
console.log(`listening on port ${PORT}`);
});
30 changes: 30 additions & 0 deletions example/worker-threads/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const { Counter } = require('../..');
const {
Worker,
isMainThread,
parentPort,
threadId,
} = require('worker_threads');

if (isMainThread) {
const worker = new Worker(__filename);

module.exports.workers = [worker];
module.exports.calculate = () =>
new Promise((resolve, reject) => {
worker.emit('message', resolve);
worker.on('error', reject);
});
} else {
const worker_invocation_total = new Counter({
name: 'worker_invocation_total',
help: 'worker invocation count | threadId="20212"',
labelNames: ['threadId'],
});

worker_invocation_total.inc({ threadId });

parentPort.postMessage(`result: ${Math.random()}`);
}
Loading

0 comments on commit 7e1fc1f

Please sign in to comment.