Skip to content

Commit

Permalink
Replace concurrent-queue (#4393)
Browse files Browse the repository at this point in the history
  • Loading branch information
soulgalore authored Jan 8, 2025
1 parent a17d570 commit 84010f6
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 115 deletions.
112 changes: 112 additions & 0 deletions lib/core/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Simplified implementation of concurrent-queue
// https://www.npmjs.com/package/concurrent-queue

export function createQueue() {
let concurrency = Number.POSITIVE_INFINITY;
let processor;

const tasks = [];
let runningCount = 0;

const enqueuedCallbacks = [];
const processingStartedCallbacks = [];
const processingEndedCallbacks = [];
const drainedCallbacks = [];

/**
* We will trigger drained callbacks when:
* tasks.length === 0 and runningCount === 0
*/
function checkDrained() {
if (tasks.length === 0 && runningCount === 0) {
for (const cb of drainedCallbacks) {
cb();
}
}
}

/**
* Attempt to start processing more tasks if we have
* capacity (runningCount < concurrency).
*/
function tryProcessNext() {
while (tasks.length > 0 && runningCount < concurrency) {
const item = tasks.shift();
runningCount++;

for (const cb of processingStartedCallbacks) {
cb({ item });
}

const promise = Promise.resolve(processor(item));

promise
.then(() => {
// Fire processingEnded callbacks
for (const cb of processingEndedCallbacks) {
cb({ item, err: undefined });
}
})
.catch(error => {
// Fire processingEnded callbacks with an error
for (const cb of processingEndedCallbacks) {
cb({ item, err: error });
}
})
.finally(() => {
runningCount--;
checkDrained();
tryProcessNext();
});
}
}

const queue = function enqueue(item) {
for (const cb of enqueuedCallbacks) {
cb({ item });
}
tasks.push(item);

tryProcessNext();
};

queue.limit = options => {
if (options && typeof options.concurrency === 'number') {
concurrency = options.concurrency;
}
return queue;
};

queue.process = fn => {
processor = fn;
return queue;
};

queue.enqueued = callback => {
enqueuedCallbacks.push(callback);
return queue;
};

queue.processingStarted = callback => {
processingStartedCallbacks.push(callback);
return queue;
};

queue.processingEnded = callback => {
processingEndedCallbacks.push(callback);
return queue;
};

queue.drained = callback => {
drainedCallbacks.push(callback);
return queue;
};

Object.defineProperty(queue, 'isDrained', {
get() {
return tasks.length === 0 && runningCount === 0;
}
});

return queue;
}
44 changes: 18 additions & 26 deletions lib/core/queueHandler.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/* eslint no-console:0 */

import cq from 'concurrent-queue';
import { getLogger } from '@sitespeed.io/log';

import { messageMaker } from '../support/messageMaker.js';
import {
registerQueueTime,
registerProcessingTime,
generateStatistics
} from './queueStatistics.js';
import { createQueue } from './queue.js';

const make = messageMaker('queueHandler').make;
const log = getLogger('sitespeedio.queuehandler');
Expand Down Expand Up @@ -103,46 +100,42 @@ export class QueueHandler {
this.queues = plugins
.filter(plugin => plugin.processMessage)
.map(plugin => {
const concurrency = plugin.concurrency || Number.POSITIVE_INFINITY;
const queue = cq().limit({ concurrency });
const concurrency = plugin.concurrency ?? Number.POSITIVE_INFINITY;
// Create a queue with that concurrency
const queue = createQueue().limit({ concurrency });

queue.plugin = plugin;

const messageWaitingStart = {},
messageProcessingStart = {};
const messageWaitingStart = {};
const messageProcessingStart = {};

queue.enqueued(object => {
const message = object.item;
const { item: message } = object;
messageWaitingStart[message.uuid] = process.hrtime();
});

queue.processingStarted(object => {
const message = object.item;

const { item: message } = object;
const waitingDuration = process.hrtime(
messageWaitingStart[message.uuid]
),
waitingNanos = waitingDuration[0] * 1e9 + waitingDuration[1];

messageWaitingStart[message.uuid]
);
const waitingNanos = waitingDuration[0] * 1e9 + waitingDuration[1];
registerQueueTime(message, queue.plugin, waitingNanos);

messageProcessingStart[message.uuid] = process.hrtime();
});

// FIXME handle rejections (i.e. failures while processing messages) properly
queue.processingEnded(object => {
const message = object.item;
const error = object.err;
const { item: message, err: error } = object;
if (error) {
let rejectionMessage =
'Rejected ' +
JSON.stringify(message, shortenData, 2) +
' for plugin: ' +
plugin.getName();

if (message && message.url)
rejectionMessage += ', url: ' + message.url;

if (message?.url) {
rejectionMessage += `, url: ${message.url}`;
}
if (error.stack) {
log.error(error.stack);
}
Expand All @@ -154,7 +147,6 @@ export class QueueHandler {
);
const processingNanos =
processingDuration[0] * 1e9 + processingDuration[1];

registerProcessingTime(message, queue.plugin, processingNanos);
});

Expand Down Expand Up @@ -221,9 +213,9 @@ export class QueueHandler {
}

async startProcessingQueues() {
for (let item of this.queues) {
const queue = item.queue,
plugin = item.plugin;
for (const item of this.queues) {
const { queue, plugin } = item;
// For each queue, set up the processor that handles messages
queue.process(message =>
Promise.resolve(plugin.processMessage(message, this))
);
Expand Down
88 changes: 0 additions & 88 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
"axe-core": "4.10.2",
"browsertime": "24.0.0-alpha.1",
"coach-core": "8.1.1",
"concurrent-queue": "7.0.2",
"dayjs": "1.11.11",
"fast-crc32c": "2.0.0",
"fast-stats": "0.0.7",
Expand Down

0 comments on commit 84010f6

Please sign in to comment.