Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

module: allow module.register from workers #53200

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ port.on('message', (message) => {
filename,
hasStdin,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -109,6 +110,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
157 changes: 112 additions & 45 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict';

const {
ArrayPrototypeFilter,
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypePushApply,
ArrayPrototypeSome,
AtomicsLoad,
AtomicsWait,
AtomicsWaitAsync,
Expand Down Expand Up @@ -164,6 +167,17 @@ class Hooks {
* @returns {any | Promise<any>} User data, ignored unless it's a promise, in which case it will be awaited.
*/
addCustomLoader(url, exports, data) {
const alreadyKnown = ArrayPrototypeSome(ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => {
if (this.#chains[hookName]) {
return ArrayPrototypeFilter(
this.#chains[hookName], (el) => el.url === url && el.data === data).length === 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this comparison of data is correct and this might actually never be triggered. Can you add a test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one good test would be (all filenames for illustration only, please use whatever names correspond to the appropriate fixtures):

  • Node is run via node --import=register.js app.js
  • register.js contains a register call that registers some hooks, and one of these hooks prints something on initialization
  • app.js contains new Worker('./worker.js') to create a worker thread without any specific customization (no execArgv)
  • Verify that the “print on initialization” doesn’t happen a second time

The point of this test is to ensure that even though new Worker without execArgv inherits the --import flag from the initial Node process, and even though register.js runs twice, the hooks don’t get registered twice. This filter check prevents double registration of the same hooks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this comparison of data is correct and this might actually never be triggered. Can you add a test?

Of course. This must be a deep equality compare. Will update. Tests need generally to be added for quite a few things. Just wanted to make sure there is agreement on the approach and featureset/constraints before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dygabo In my (soon to be published) PR I used isDeepStrictEqual from internal/util/comparisons, which is the one internally used from assert.

}
}), (el) => el);

if (alreadyKnown) {
return undefined;
}

const {
initialize,
resolve,
Expand All @@ -172,11 +186,11 @@ class Hooks {

if (resolve) {
const next = this.#chains.resolve[this.#chains.resolve.length - 1];
ArrayPrototypePush(this.#chains.resolve, { __proto__: null, fn: resolve, url, next });
ArrayPrototypePush(this.#chains.resolve, { __proto__: null, fn: resolve, url, data, next });
}
if (load) {
const next = this.#chains.load[this.#chains.load.length - 1];
ArrayPrototypePush(this.#chains.load, { __proto__: null, fn: load, url, next });
ArrayPrototypePush(this.#chains.load, { __proto__: null, fn: load, url, data, next });
}
return initialize?.(data);
}
Expand All @@ -198,6 +212,7 @@ class Hooks {
originalSpecifier,
parentURL,
importAttributes = { __proto__: null },
threadId
) {
throwIfInvalidParentURL(parentURL);

Expand All @@ -206,6 +221,7 @@ class Hooks {
conditions: getDefaultConditions(),
importAttributes,
parentURL,
threadId,
};
const meta = {
chainFinished: null,
Expand Down Expand Up @@ -478,10 +494,12 @@ class HooksProxy {
*/
#lock;
/**
* The InternalWorker instance, which lets us communicate with the loader thread.
* The HooksWorker instance, which lets us communicate with the loader thread.
*/
#worker;

#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
* if the worker has already sent a notification before putting the main
Expand All @@ -497,28 +515,50 @@ class HooksProxy {
#numberOfPendingAsyncResponses = 0;

#isReady = false;
#isWorkerOwner = false;

constructor() {
const { InternalWorker } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const { HooksWorker, hooksPort } = require('internal/worker');
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
try {
// The customization hooks thread is only created once. All other threads reuse
// the existing instance. If another thread created it, the constructor will throw
// Fallback is to use the existing hooksPort created by the thread that originally
// spawned the customization hooks thread.
this.#worker = new HooksWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#isWorkerOwner = true;
this.#portToHooksThread = this.#worker;
} catch (e) {
if (e.code === 'ERR_HOOKS_THREAD_EXISTS') {
this.#portToHooksThread = hooksPort;
} else {
throw e;
}
}
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of
// these Hooks instances has a HooksWorker. That was the Hooks instance
// created for the first thread that registers a hook set.
// It means for all Hooks instances that are not on that thread => they are
// done because they delegate to the single HooksWorker.
if (!this.#isWorkerOwner) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -535,6 +575,37 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;

const {
port1: fromHooksThread,
port2: toHooksThread,
} = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}

this.#portToHooksThread.postMessage(
{
__proto__: null,
args,
lock: this.#lock,
method,
port: toHooksThread,
},
usedTransferList,
);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -543,22 +614,7 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -567,7 +623,11 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
this.#worker.ref();
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
}

let response;
Expand All @@ -576,18 +636,26 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(asyncCommChannel.port1);
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
this.#worker.unref();
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
}

const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
fromHooksThread.close();

return this.#unwrapMessage(response);
}

/**
Expand All @@ -598,11 +666,7 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);

let response;
do {
Expand All @@ -611,14 +675,17 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = this.#worker.receiveMessageSync();
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();

return this.#unwrapMessage(response);
}

Expand Down
19 changes: 14 additions & 5 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { threadId } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -500,7 +501,7 @@ class ModuleLoader {
*/
resolve(originalSpecifier, parentURL, importAttributes) {
if (this.#customizations) {
return this.#customizations.resolve(originalSpecifier, parentURL, importAttributes);
return this.#customizations.resolve(originalSpecifier, parentURL, importAttributes, threadId);
}
const requestKey = this.#resolveCache.serializeKey(originalSpecifier, importAttributes);
const cachedResult = this.#resolveCache.get(requestKey, parentURL);
Expand Down Expand Up @@ -607,21 +608,22 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register some loader specifier.
* Register a loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] }}
* @returns {{ format: string, url: URL['href'] } | undefined}
*/
register(originalSpecifier, parentURL, data, transferList) {
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data, threadId);
}

/**
Expand All @@ -634,7 +636,7 @@ class CustomizedModuleLoader {
* @returns {{ format: string, url: URL['href'] }}
*/
resolve(originalSpecifier, parentURL, importAttributes) {
return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes);
return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes, threadId);
}

resolveSync(originalSpecifier, parentURL, importAttributes) {
Expand Down Expand Up @@ -719,6 +721,12 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -780,6 +788,7 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading