From da93a14cb6ed913e9ed621451c731a3cc4484239 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Tue, 4 Jun 2024 09:14:06 +0200 Subject: [PATCH 01/12] module: have a single hooks thread for all workers `module.register` not supported when called from worker threads with this commit it only works for the main thread. --- lib/internal/main/worker_thread.js | 2 + lib/internal/modules/esm/hooks.js | 127 ++++++++++++------ lib/internal/modules/esm/loader.js | 19 ++- lib/internal/modules/esm/worker.js | 74 ++++++++-- lib/internal/worker.js | 32 ++++- src/handle_wrap.cc | 3 +- test/common/index.mjs | 2 + test/es-module/test-esm-loader-mock.mjs | 7 +- test/es-module/test-esm-loader-threads.mjs | 74 ++++++++++ test/es-module/test-esm-named-exports.js | 4 +- test/es-module/test-esm-named-exports.mjs | 9 +- test/es-module/test-esm-virtual-json.mjs | 3 +- .../builtin-named-exports.mjs | 13 +- .../es-module-loaders/hooks-exit-worker.mjs | 21 +++ test/fixtures/es-module-loaders/hooks-log.mjs | 19 +++ .../not-found-assert-loader.mjs | 17 +-- .../es-module-loaders/worker-fail-on-load.mjs | 1 + .../worker-fail-on-resolve.mjs | 1 + .../es-module-loaders/worker-log-again.mjs | 3 + .../worker-log-fail-worker-load.mjs | 12 ++ .../worker-log-fail-worker-resolve.mjs | 12 ++ .../fixtures/es-module-loaders/worker-log.mjs | 9 ++ .../es-module-loaders/workers-spawned.mjs | 7 + 23 files changed, 392 insertions(+), 79 deletions(-) create mode 100644 test/es-module/test-esm-loader-threads.mjs create mode 100644 test/fixtures/es-module-loaders/hooks-exit-worker.mjs create mode 100644 test/fixtures/es-module-loaders/hooks-log.mjs create mode 100644 test/fixtures/es-module-loaders/worker-fail-on-load.mjs create mode 100644 test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs create mode 100644 test/fixtures/es-module-loaders/worker-log-again.mjs create mode 100644 test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs create mode 100644 test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs create mode 100644 test/fixtures/es-module-loaders/worker-log.mjs create mode 100644 test/fixtures/es-module-loaders/workers-spawned.mjs diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index aa329b9fe04f15..32bccca9b53a72 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -95,6 +95,7 @@ port.on('message', (message) => { filename, hasStdin, publicPort, + hooksPort, workerData, } = message; @@ -109,6 +110,7 @@ port.on('message', (message) => { } require('internal/worker').assignEnvironmentData(environmentData); + require('internal/worker').hooksPort = hooksPort; if (SharedArrayBuffer !== undefined) { // The counter is only passed to the workers created by the main thread, diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index ba655116a0bb57..f5833ad61cdb75 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -35,7 +35,7 @@ const { const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors'); const { URL } = require('internal/url'); const { canParse: URLCanParse } = internalBinding('url'); -const { receiveMessageOnPort } = require('worker_threads'); +const { receiveMessageOnPort, isMainThread } = require('worker_threads'); const { isAnyArrayBuffer, isArrayBufferView, @@ -482,6 +482,8 @@ class HooksProxy { */ #worker; + #portToHooksThread; + /** * The last notification ID received from the worker. This is used to detect * if the worker has already sent a notification before putting the main @@ -499,26 +501,38 @@ class HooksProxy { #isReady = false; constructor() { - const { InternalWorker } = require('internal/worker'); - MessageChannel ??= require('internal/worker/io').MessageChannel; - + const { InternalWorker, hooksPort } = require('internal/worker'); const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH); this.#lock = new Int32Array(lock); - this.#worker = new InternalWorker(loaderWorkerId, { - stderr: false, - stdin: false, - stdout: false, - trackUnmanagedFds: false, - workerData: { - lock, - }, - }); - this.#worker.unref(); // ! Allows the process to eventually exit. - this.#worker.on('exit', process.exit); + if (isMainThread) { + // Main thread is the only one that creates the internal single hooks worker + this.#worker = new InternalWorker(loaderWorkerId, { + stderr: false, + stdin: false, + stdout: false, + trackUnmanagedFds: false, + workerData: { + lock, + }, + }); + this.#worker.unref(); // ! Allows the process to eventually exit. + this.#worker.on('exit', process.exit); + this.#portToHooksThread = this.#worker; + } else { + this.#portToHooksThread = hooksPort; + } } waitForWorker() { + // There is one Hooks instance for each worker thread. But only one of these Hooks instances + // has an InternalWorker. That was the Hooks instance created for the main thread. + // It means for all Hooks instances that are not on the main thread => they are ready because they + // delegate to the single InternalWorker anyway. + if (!isMainThread) { + return; + } + if (!this.#isReady) { const { kIsOnline } = require('internal/worker'); if (!this.#worker[kIsOnline]) { @@ -535,6 +549,37 @@ class HooksProxy { } } + #postMessageToWorker(method, type, transferList, args) { + this.waitForWorker(); + + MessageChannel ??= require('internal/worker/io').MessageChannel; + + const { + port1: fromHooksThread, + port2: toHooksThread, + } = new MessageChannel(); + + // Pass work to the worker. + debug(`post ${type} message to worker`, { method, args, transferList }); + const usedTransferList = [toHooksThread]; + if (transferList) { + ArrayPrototypePushApply(usedTransferList, transferList); + } + + this.#portToHooksThread.postMessage( + { + __proto__: null, + args, + lock: this.#lock, + method, + port: toHooksThread, + }, + usedTransferList, + ); + + return fromHooksThread; + } + /** * Invoke a remote method asynchronously. * @param {string} method Method to invoke @@ -543,22 +588,7 @@ class HooksProxy { * @returns {Promise} */ async makeAsyncRequest(method, transferList, ...args) { - this.waitForWorker(); - - MessageChannel ??= require('internal/worker/io').MessageChannel; - const asyncCommChannel = new MessageChannel(); - - // Pass work to the worker. - debug('post async message to worker', { method, args, transferList }); - const finalTransferList = [asyncCommChannel.port2]; - if (transferList) { - ArrayPrototypePushApply(finalTransferList, transferList); - } - this.#worker.postMessage({ - __proto__: null, - method, args, - port: asyncCommChannel.port2, - }, finalTransferList); + const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args); if (this.#numberOfPendingAsyncResponses++ === 0) { // On the next lines, the main thread will await a response from the worker thread that might @@ -567,7 +597,11 @@ class HooksProxy { // However we want to keep the process alive until the worker thread responds (or until the // event loop of the worker thread is also empty), so we ref the worker until we get all the // responses back. - this.#worker.ref(); + if (this.#worker) { + this.#worker.ref(); + } else { + this.#portToHooksThread.ref(); + } } let response; @@ -576,18 +610,26 @@ class HooksProxy { await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value; this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); - response = receiveMessageOnPort(asyncCommChannel.port1); + response = receiveMessageOnPort(fromHooksThread); } while (response == null); debug('got async response from worker', { method, args }, this.#lock); if (--this.#numberOfPendingAsyncResponses === 0) { // We got all the responses from the worker, its job is done (until next time). - this.#worker.unref(); + if (this.#worker) { + this.#worker.unref(); + } else { + this.#portToHooksThread.unref(); + } + } + + if (response.message.status === 'exit') { + process.exit(response.message.body); } - const body = this.#unwrapMessage(response); - asyncCommChannel.port1.close(); - return body; + fromHooksThread.close(); + + return this.#unwrapMessage(response); } /** @@ -598,11 +640,7 @@ class HooksProxy { * @returns {any} */ makeSyncRequest(method, transferList, ...args) { - this.waitForWorker(); - - // Pass work to the worker. - debug('post sync message to worker', { method, args, transferList }); - this.#worker.postMessage({ __proto__: null, method, args }, transferList); + const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args); let response; do { @@ -611,7 +649,7 @@ class HooksProxy { AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId); this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); - response = this.#worker.receiveMessageSync(); + response = receiveMessageOnPort(fromHooksThread); } while (response == null); debug('got sync response from worker', { method, args }); if (response.message.status === 'never-settle') { @@ -619,6 +657,9 @@ class HooksProxy { } else if (response.message.status === 'exit') { process.exit(response.message.body); } + + fromHooksThread.close(); + return this.#unwrapMessage(response); } diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index afb0e9fd9ec1c6..e060b36eccacab 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap'); const { urlToFilename, } = require('internal/modules/helpers'); +const { isMainThread } = require('worker_threads'); let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer; /** @@ -607,10 +608,11 @@ class CustomizedModuleLoader { */ constructor() { getHooksProxy(); + _hasCustomizations = true; } /** - * Register some loader specifier. + * Register a loader specifier. * @param {string} originalSpecifier The specified URL path of the loader to * be registered. * @param {string} parentURL The parent URL from where the loader will be @@ -618,10 +620,14 @@ class CustomizedModuleLoader { * @param {any} [data] Arbitrary data to be passed from the custom loader * (user-land) to the worker. * @param {any[]} [transferList] Objects in `data` that are changing ownership - * @returns {{ format: string, url: URL['href'] }} + * @returns {{ format: string, url: URL['href'] } | undefined} */ register(originalSpecifier, parentURL, data, transferList) { - return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); + if (isMainThread) { + // Only the main thread has a Hooks instance with worker thread. All other Worker threads + // delegate their hooks to the HooksThread of the main thread. + return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); + } } /** @@ -719,6 +725,12 @@ function getHooksProxy() { return hooksProxy; } +let _hasCustomizations = false; +function hasCustomizations() { + return _hasCustomizations; +} + + let cascadedLoader; /** @@ -780,6 +792,7 @@ function register(specifier, parentURL = undefined, options) { module.exports = { createModuleLoader, + hasCustomizations, getHooksProxy, getOrInitializeCascadedLoader, register, diff --git a/lib/internal/modules/esm/worker.js b/lib/internal/modules/esm/worker.js index 311d77fb099384..088667f3c0d5d7 100644 --- a/lib/internal/modules/esm/worker.js +++ b/lib/internal/modules/esm/worker.js @@ -1,6 +1,8 @@ 'use strict'; const { + ArrayPrototypeFilter, + ArrayPrototypePush, AtomicsAdd, AtomicsNotify, DataViewPrototypeGetBuffer, @@ -97,7 +99,21 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { // so it can detect the exit event. const { exit } = process; process.exit = function(code) { - syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); + const exitMsg = wrapMessage('exit', code ?? process.exitCode); + if (hooks) { + for (let i = 0; i < allThreadRegisteredHandlerPorts.length; i++) { + const { port: registeredPort } = allThreadRegisteredHandlerPorts[i]; + registeredPort.postMessage(exitMsg); + } + + for (const { port, lock: operationLock } of unsettledResponsePorts) { + port.postMessage(exitMsg); + // Wake all threads that have pending operations. + AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); + } + } + syncCommPort.postMessage(exitMsg); AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); return ReflectApply(exit, this, arguments); @@ -145,8 +161,11 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { const unsettledResponsePorts = new SafeSet(); process.on('beforeExit', () => { - for (const port of unsettledResponsePorts) { + for (const { port, lock: operationLock } of unsettledResponsePorts) { port.postMessage(wrapMessage('never-settle')); + // Wake all threads that have pending operations. + AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); } unsettledResponsePorts.clear(); @@ -164,24 +183,59 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { setImmediate(() => {}); }); + let allThreadRegisteredHandlerPorts = []; + /** + * @callback registerHandler + * @param {MessagePort} toWorkerThread - Upon Worker creation a message channel between the new Worker + * and the Hooks thread is being initialized. This is the MessagePort that the Hooks thread will use post + * messages to the worker. The other MessagePort is passed to the new Worker itself via LOAD_SCRIPT message + */ + function registerHandler(toWorkerThread, registeredThreadId) { + toWorkerThread.on('message', handleMessage); + ArrayPrototypePush(allThreadRegisteredHandlerPorts, { port: toWorkerThread, registeredThreadId }); + } + + /** + * @callback registerHandler + * @param {number} unregisteredThreadId - the thread id of the worker thread that is being unregistered + * from the Hooks Thread + */ + function unregisterHandler(unregisteredThreadId) { + allThreadRegisteredHandlerPorts = ArrayPrototypeFilter( + allThreadRegisteredHandlerPorts, (el) => el.registeredThreadId !== unregisteredThreadId); + } + + function getMessageHandler(method) { + if (method === '#registerWorkerClient') { + return registerHandler; + } + if (method === '#unregisterWorkerClient') { + return unregisterHandler; + } + return hooks[method]; + } + /** * Handles incoming messages from the main thread or other workers. * @param {object} options - The options object. * @param {string} options.method - The name of the hook. * @param {Array} options.args - The arguments to pass to the method. * @param {MessagePort} options.port - The message port to use for communication. + * @param {Int32Array} options.lock - The shared memory where the caller expects to get awaken. */ - async function handleMessage({ method, args, port }) { + async function handleMessage({ method, args, port, lock: msgLock }) { // Each potential exception needs to be caught individually so that the correct error is sent to // the main thread. let hasError = false; let shouldRemoveGlobalErrorHandler = false; - assert(typeof hooks[method] === 'function'); + const messageHandler = getMessageHandler(method); + assert(typeof messageHandler === 'function'); if (port == null && !hasUncaughtExceptionCaptureCallback()) { // When receiving sync messages, we want to unlock the main thread when there's an exception. process.on('uncaughtException', errorHandler); shouldRemoveGlobalErrorHandler = true; } + const usedLock = msgLock ?? lock; // We are about to yield the execution with `await ReflectApply` below. In case the code // following the `await` never runs, we remove the message handler so the `beforeExit` event @@ -192,17 +246,19 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { clearImmediate(immediate); immediate = setImmediate(checkForMessages).unref(); - unsettledResponsePorts.add(port ?? syncCommPort); + const unsettledActionData = { port: port ?? syncCommPort, lock: usedLock }; + + unsettledResponsePorts.add(unsettledActionData); let response; try { - response = await ReflectApply(hooks[method], hooks, args); + response = await ReflectApply(messageHandler, hooks, args); } catch (exception) { hasError = true; response = exception; } - unsettledResponsePorts.delete(port ?? syncCommPort); + unsettledResponsePorts.delete(unsettledActionData); // Send the method response (or exception) to the main thread. try { @@ -215,8 +271,8 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { (port ?? syncCommPort).postMessage(wrapMessage('error', exception)); } - AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); - AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); + AtomicsAdd(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); if (shouldRemoveGlobalErrorHandler) { process.off('uncaughtException', errorHandler); } diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 9eefaa97021756..766659d4464379 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -132,7 +132,7 @@ class Worker extends EventEmitter { constructor(filename, options = kEmptyObject) { throwIfBuildingSnapshot('Creating workers'); super(); - const isInternal = arguments[2] === kIsInternal; + const isInternal = this.#isInternal = arguments[2] === kIsInternal; debug( `[${threadId}] create new worker`, filename, @@ -258,6 +258,15 @@ class Worker extends EventEmitter { ...new SafeArrayIterator(options.transferList)); this[kPublicPort] = port1; + const { + port1: toWorkerThread, + port2: toHooksThread, + } = new MessageChannel(); + if (!isInternal) { + // This is not an internal hooks thread => it needs a channel to the hooks thread: + // - send it one side of a channel here + ArrayPrototypePush(transferList, toHooksThread); + } ArrayPrototypeForEach(['message', 'messageerror'], (event) => { this[kPublicPort].on(event, (message) => this.emit(event, message)); }); @@ -272,8 +281,20 @@ class Worker extends EventEmitter { workerData: options.workerData, environmentData, publicPort: port2, + hooksPort: !isInternal ? toHooksThread : undefined, hasStdin: !!options.stdin, }, transferList); + + const loaderModule = require('internal/modules/esm/loader'); + const hasCustomizations = loaderModule.hasCustomizations(); + + if (!isInternal && hasCustomizations) { + // - send the second side of the channel to the hooks thread, + // also announce the threadId of the Worker that will use that port. + // This is needed for the cleanup stage + loaderModule.getHooksProxy().makeSyncRequest( + '#registerWorkerClient', [toWorkerThread], toWorkerThread, this.threadId); + } // Use this to cache the Worker's loopStart value once available. this[kLoopStartTime] = -1; this[kIsOnline] = false; @@ -293,6 +314,12 @@ class Worker extends EventEmitter { [kOnExit](code, customErr, customErrReason) { debug(`[${threadId}] hears end event for Worker ${this.threadId}`); + const loaderModule = require('internal/modules/esm/loader'); + const hasCustomizations = loaderModule.hasCustomizations(); + + if (!this.#isInternal && hasCustomizations) { + loaderModule.getHooksProxy()?.makeAsyncRequest('#unregisterWorkerClient', undefined, this.threadId); + } drainMessagePort(this[kPublicPort]); drainMessagePort(this[kPort]); this.removeAllListeners('message'); @@ -435,6 +462,8 @@ class Worker extends EventEmitter { return makeResourceLimits(this[kHandle].getResourceLimits()); } + #isInternal = false; + getHeapSnapshot(options) { const { HeapSnapshotStream, @@ -532,6 +561,7 @@ module.exports = { kIsOnline, isMainThread, SHARE_ENV, + hooksPort: undefined, resourceLimits: !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, setEnvironmentData, diff --git a/src/handle_wrap.cc b/src/handle_wrap.cc index f651b81ca8ce6b..1fbe0178c7a10d 100644 --- a/src/handle_wrap.cc +++ b/src/handle_wrap.cc @@ -148,7 +148,8 @@ void HandleWrap::OnClose(uv_handle_t* handle) { wrap->OnClose(); wrap->handle_wrap_queue_.Remove(); - if (!wrap->persistent().IsEmpty() && + if (!env->isolate()->IsExecutionTerminating() && + !wrap->persistent().IsEmpty() && wrap->object() ->Has(env->context(), env->handle_onclose_symbol()) .FromMaybe(false)) { diff --git a/test/common/index.mjs b/test/common/index.mjs index 430527faf8f0ea..6b566a899a35bb 100644 --- a/test/common/index.mjs +++ b/test/common/index.mjs @@ -51,6 +51,7 @@ const { skipIfDumbTerminal, skipIfEslintMissing, skipIfInspectorDisabled, + skipIfWorker, spawnPromisified, } = common; @@ -106,5 +107,6 @@ export { skipIfDumbTerminal, skipIfEslintMissing, skipIfInspectorDisabled, + skipIfWorker, spawnPromisified, }; diff --git a/test/es-module/test-esm-loader-mock.mjs b/test/es-module/test-esm-loader-mock.mjs index 164d0ac3775039..0d39f549581a54 100644 --- a/test/es-module/test-esm-loader-mock.mjs +++ b/test/es-module/test-esm-loader-mock.mjs @@ -1,6 +1,11 @@ -import '../common/index.mjs'; +import { skipIfWorker } from '../common/index.mjs'; import assert from 'node:assert/strict'; import { mock } from '../fixtures/es-module-loaders/mock.mjs'; +// Importing mock.mjs above will call `register` to modify the loaders chain. +// Modifying the loader chain is not supported currently when running from a worker thread. +// Relevant PR: https://github.com/nodejs/node/pull/52706 +// See comment: https://github.com/nodejs/node/pull/52706/files#r1585144580 +skipIfWorker(); mock('node:events', { EventEmitter: 'This is mocked!' diff --git a/test/es-module/test-esm-loader-threads.mjs b/test/es-module/test-esm-loader-threads.mjs new file mode 100644 index 00000000000000..7310a9ac5b54ac --- /dev/null +++ b/test/es-module/test-esm-loader-threads.mjs @@ -0,0 +1,74 @@ +import { spawnPromisified } from '../common/index.mjs'; +import * as fixtures from '../common/fixtures.mjs'; +import { strictEqual } from 'node:assert'; +import { execPath } from 'node:process'; +import { describe, it } from 'node:test'; + +describe('off-thread hooks', { concurrency: true }, () => { + it('uses only one hooks thread to support multiple application threads', async () => { + const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [ + '--no-warnings', + '--import', + `data:text/javascript,${encodeURIComponent(` + import { register } from 'node:module'; + register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-log.mjs'))}); + `)}`, + fixtures.path('es-module-loaders/workers-spawned.mjs'), + ]); + + strictEqual(stderr, ''); + strictEqual(stdout.split('\n').filter((line) => line.startsWith('initialize')).length, 1); + strictEqual(stdout.split('\n').filter((line) => line === 'foo').length, 2); + strictEqual(stdout.split('\n').filter((line) => line === 'bar').length, 4); + // Calls to resolve/load: + // 1x main script: test/fixtures/es-module-loaders/workers-spawned.mjs + // 3x worker_threads + // => 1x test/fixtures/es-module-loaders/worker-log.mjs + // 2x test/fixtures/es-module-loaders/worker-log-again.mjs => once per worker-log.mjs Worker instance + // 2x test/fixtures/es-module-loaders/worker-log.mjs => once per worker-log.mjs Worker instance + // 4x test/fixtures/es-module-loaders/worker-log-again.mjs => 2x for each worker-log + // 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs + // =========================== + // 16 calls to resolve + 16 calls to load hook for the registered custom loader + strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 16); + strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked load')).length, 16); + strictEqual(code, 0); + strictEqual(signal, null); + }); + + it('propagates the exit code from worker thread import exiting from resolve hook', async () => { + const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [ + '--no-warnings', + '--import', + `data:text/javascript,${encodeURIComponent(` + import { register } from 'node:module'; + register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-exit-worker.mjs'))}); + `)}`, + fixtures.path('es-module-loaders/worker-log-fail-worker-resolve.mjs'), + ]); + + strictEqual(stderr, ''); + strictEqual(stdout.split('\n').filter((line) => line.startsWith('resolve process-exit-module-resolve')).length, 1); + strictEqual(code, 42); + strictEqual(signal, null); + }); + + it('propagates the exit code from worker thread import exiting from load hook', async () => { + const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [ + '--no-warnings', + '--import', + `data:text/javascript,${encodeURIComponent(` + import { register } from 'node:module'; + register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-exit-worker.mjs'))}); + `)}`, + fixtures.path('es-module-loaders/worker-log-fail-worker-load.mjs'), + ]); + + strictEqual(stderr, ''); + strictEqual(stdout.split('\n').filter((line) => line.startsWith('resolve process-exit-module-load')).length, 1); + strictEqual(stdout.split('\n').filter((line) => line.startsWith('load process-exit-on-load:///')).length, 1); + strictEqual(code, 43); + strictEqual(signal, null); + }); + +}); diff --git a/test/es-module/test-esm-named-exports.js b/test/es-module/test-esm-named-exports.js index 2c6f67288aa57c..00b7aebbfd1f46 100644 --- a/test/es-module/test-esm-named-exports.js +++ b/test/es-module/test-esm-named-exports.js @@ -1,7 +1,9 @@ // Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs 'use strict'; -require('../common'); +const common = require('../common'); +common.skipIfWorker(); + const { readFile, __fromLoader } = require('fs'); const assert = require('assert'); diff --git a/test/es-module/test-esm-named-exports.mjs b/test/es-module/test-esm-named-exports.mjs index bbe9c96b92d9b8..6e584b05aa204f 100644 --- a/test/es-module/test-esm-named-exports.mjs +++ b/test/es-module/test-esm-named-exports.mjs @@ -1,9 +1,10 @@ // Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs -import '../common/index.mjs'; -import { readFile, __fromLoader } from 'fs'; +import { skipIfWorker } from '../common/index.mjs'; +import * as fs from 'fs'; import assert from 'assert'; import ok from '../fixtures/es-modules/test-esm-ok.mjs'; +skipIfWorker(); assert(ok); -assert(readFile); -assert(__fromLoader); +assert(fs.readFile); +assert(fs.__fromLoader); diff --git a/test/es-module/test-esm-virtual-json.mjs b/test/es-module/test-esm-virtual-json.mjs index a42b037fc1f200..1064a6af5026cf 100644 --- a/test/es-module/test-esm-virtual-json.mjs +++ b/test/es-module/test-esm-virtual-json.mjs @@ -1,7 +1,8 @@ -import '../common/index.mjs'; +import { skipIfWorker } from '../common/index.mjs'; import * as fixtures from '../common/fixtures.mjs'; import { register } from 'node:module'; import assert from 'node:assert'; +skipIfWorker(); async function resolve(referrer, context, next) { const result = await next(referrer, context); diff --git a/test/fixtures/es-module-loaders/builtin-named-exports.mjs b/test/fixtures/es-module-loaders/builtin-named-exports.mjs index 123b12c26bf0c9..4e22f631eba416 100644 --- a/test/fixtures/es-module-loaders/builtin-named-exports.mjs +++ b/test/fixtures/es-module-loaders/builtin-named-exports.mjs @@ -1,3 +1,4 @@ +import { isMainThread } from '../../common/index.mjs'; import * as fixtures from '../../common/fixtures.mjs'; import { createRequire, register } from 'node:module'; @@ -10,8 +11,10 @@ Object.defineProperty(globalThis, GET_BUILTIN, { configurable: false, }); -register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), { - data: { - GET_BUILTIN, - }, -}); +if (isMainThread) { + register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), { + data: { + GET_BUILTIN, + }, + }); +} diff --git a/test/fixtures/es-module-loaders/hooks-exit-worker.mjs b/test/fixtures/es-module-loaders/hooks-exit-worker.mjs new file mode 100644 index 00000000000000..d499a835e6456c --- /dev/null +++ b/test/fixtures/es-module-loaders/hooks-exit-worker.mjs @@ -0,0 +1,21 @@ +import { writeFileSync } from 'node:fs'; + +export function resolve(specifier, context, next) { + writeFileSync(1, `resolve ${specifier}\n`); + if (specifier === 'process-exit-module-resolve') { + process.exit(42); + } + + if (specifier === 'process-exit-module-load') { + return { __proto__: null, shortCircuit: true, url: 'process-exit-on-load:///' } + } + return next(specifier, context); +} + +export function load(url, context, next) { + writeFileSync(1, `load ${url}\n`); + if (url === 'process-exit-on-load:///') { + process.exit(43); + } + return next(url, context); +} diff --git a/test/fixtures/es-module-loaders/hooks-log.mjs b/test/fixtures/es-module-loaders/hooks-log.mjs new file mode 100644 index 00000000000000..2d2512281e8bd5 --- /dev/null +++ b/test/fixtures/es-module-loaders/hooks-log.mjs @@ -0,0 +1,19 @@ +import { writeFileSync } from 'node:fs'; + +let initializeCount = 0; +let resolveCount = 0; +let loadCount = 0; + +export function initialize() { + writeFileSync(1, `initialize ${++initializeCount}\n`); +} + +export function resolve(specifier, context, next) { + writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier}\n`); + return next(specifier, context); +} + +export function load(url, context, next) { + writeFileSync(1, `hooked load ${++loadCount} ${url}\n`); + return next(url, context); +} diff --git a/test/fixtures/es-module-loaders/not-found-assert-loader.mjs b/test/fixtures/es-module-loaders/not-found-assert-loader.mjs index bf66efbd0810e5..7d53e31df918a7 100644 --- a/test/fixtures/es-module-loaders/not-found-assert-loader.mjs +++ b/test/fixtures/es-module-loaders/not-found-assert-loader.mjs @@ -1,16 +1,13 @@ import assert from 'node:assert'; // A loader that asserts that the defaultResolve will throw "not found" -// (skipping the top-level main of course, and the built-in ones needed for run-worker). -let mainLoad = true; export async function resolve(specifier, { importAttributes }, next) { - if (mainLoad || specifier === 'path' || specifier === 'worker_threads') { - mainLoad = false; - return next(specifier); + if (specifier.startsWith('./not-found')) { + await assert.rejects(next(specifier), { code: 'ERR_MODULE_NOT_FOUND' }); + return { + url: 'node:fs', + importAttributes, + }; } - await assert.rejects(next(specifier), { code: 'ERR_MODULE_NOT_FOUND' }); - return { - url: 'node:fs', - importAttributes, - }; + return next(specifier); } diff --git a/test/fixtures/es-module-loaders/worker-fail-on-load.mjs b/test/fixtures/es-module-loaders/worker-fail-on-load.mjs new file mode 100644 index 00000000000000..46e88664a03c5c --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-fail-on-load.mjs @@ -0,0 +1 @@ +import 'process-exit-module-load'; diff --git a/test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs b/test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs new file mode 100644 index 00000000000000..e8e7adde42585f --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs @@ -0,0 +1 @@ +import 'process-exit-module-resolve'; diff --git a/test/fixtures/es-module-loaders/worker-log-again.mjs b/test/fixtures/es-module-loaders/worker-log-again.mjs new file mode 100644 index 00000000000000..2969edc8dac382 --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log-again.mjs @@ -0,0 +1,3 @@ +import { bar } from './module-named-exports.mjs'; + +console.log(bar); diff --git a/test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs b/test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs new file mode 100644 index 00000000000000..81797da392cb7a --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs @@ -0,0 +1,12 @@ +import { Worker } from 'worker_threads'; +import { foo } from './module-named-exports.mjs'; + +const workerURLFailOnLoad = new URL('./worker-fail-on-load.mjs', import.meta.url); +console.log(foo); + +// Spawn a worker that will fail to import a dependant module +new Worker(workerURLFailOnLoad); + +process.on('exit', (code) => { + console.log(`process exit code: ${code}`) +}); diff --git a/test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs b/test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs new file mode 100644 index 00000000000000..b5ff238967f4ef --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs @@ -0,0 +1,12 @@ +import { Worker } from 'worker_threads'; +import { foo } from './module-named-exports.mjs'; + +const workerURLFailOnResolve = new URL('./worker-fail-on-resolve.mjs', import.meta.url); +console.log(foo); + +// Spawn a worker that will fail to import a dependant module +new Worker(workerURLFailOnResolve); + +process.on('exit', (code) => { + console.log(`process exit code: ${code}`) +}); diff --git a/test/fixtures/es-module-loaders/worker-log.mjs b/test/fixtures/es-module-loaders/worker-log.mjs new file mode 100644 index 00000000000000..13290c37d07104 --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log.mjs @@ -0,0 +1,9 @@ +import { Worker } from 'worker_threads'; +import { foo } from './module-named-exports.mjs'; + +const workerURL = new URL('./worker-log-again.mjs', import.meta.url); +console.log(foo); + +// Spawn two workers +new Worker(workerURL); +new Worker(workerURL); diff --git a/test/fixtures/es-module-loaders/workers-spawned.mjs b/test/fixtures/es-module-loaders/workers-spawned.mjs new file mode 100644 index 00000000000000..439847656fe13e --- /dev/null +++ b/test/fixtures/es-module-loaders/workers-spawned.mjs @@ -0,0 +1,7 @@ +import { Worker } from 'worker_threads'; + +const workerURL = new URL('./worker-log.mjs', import.meta.url); + +// Spawn two workers +new Worker(workerURL); +new Worker(workerURL); From fd5ffdfcb97c3366f7c55f9764dea08fbbed4200 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 14:26:13 +0200 Subject: [PATCH 02/12] module: allow module.register from workers --- lib/internal/modules/esm/hooks.js | 25 ++++++++++++++++--- lib/internal/modules/esm/loader.js | 12 ++++----- lib/internal/worker.js | 2 ++ src/node_worker.cc | 9 +++++++ src/node_worker.h | 3 +++ test/es-module/test-esm-loader-hooks.mjs | 2 +- .../test-esm-loader-programmatically.mjs | 8 +++--- test/es-module/test-esm-loader-threads.mjs | 7 +++++- test/fixtures/es-module-loaders/hooks-log.mjs | 2 +- 9 files changed, 52 insertions(+), 18 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index f5833ad61cdb75..4086c5451dc4a2 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -1,8 +1,11 @@ 'use strict'; const { + ArrayPrototypeFilter, + ArrayPrototypeMap, ArrayPrototypePush, ArrayPrototypePushApply, + ArrayPrototypeReduce, AtomicsLoad, AtomicsWait, AtomicsWaitAsync, @@ -35,7 +38,7 @@ const { const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors'); const { URL } = require('internal/url'); const { canParse: URLCanParse } = internalBinding('url'); -const { receiveMessageOnPort, isMainThread } = require('worker_threads'); +const { receiveMessageOnPort } = require('worker_threads'); const { isAnyArrayBuffer, isArrayBufferView, @@ -164,6 +167,18 @@ class Hooks { * @returns {any | Promise} User data, ignored unless it's a promise, in which case it will be awaited. */ addCustomLoader(url, exports, data) { + const alreadyKnown = ArrayPrototypeReduce( + ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => { + if (this.#chains[hookName]) { + const res2 = ArrayPrototypeFilter(this.#chains[hookName], (el) => el.url === url); + return res2.length; + } + }), (acc, val) => acc || (val === 1), false); + + if (alreadyKnown) { + return undefined; + } + const { initialize, resolve, @@ -499,13 +514,14 @@ class HooksProxy { #numberOfPendingAsyncResponses = 0; #isReady = false; + #isWorkerOwner = false; constructor() { - const { InternalWorker, hooksPort } = require('internal/worker'); + const { InternalWorker, hooksPort, hasHooksThread } = require('internal/worker'); const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH); this.#lock = new Int32Array(lock); - if (isMainThread) { + if (!hasHooksThread()) { // Main thread is the only one that creates the internal single hooks worker this.#worker = new InternalWorker(loaderWorkerId, { stderr: false, @@ -518,6 +534,7 @@ class HooksProxy { }); this.#worker.unref(); // ! Allows the process to eventually exit. this.#worker.on('exit', process.exit); + this.#isWorkerOwner = true; this.#portToHooksThread = this.#worker; } else { this.#portToHooksThread = hooksPort; @@ -529,7 +546,7 @@ class HooksProxy { // has an InternalWorker. That was the Hooks instance created for the main thread. // It means for all Hooks instances that are not on the main thread => they are ready because they // delegate to the single InternalWorker anyway. - if (!isMainThread) { + if (!this.#isWorkerOwner) { return; } diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index e060b36eccacab..f38d2f5b0824c8 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -41,7 +41,6 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap'); const { urlToFilename, } = require('internal/modules/helpers'); -const { isMainThread } = require('worker_threads'); let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer; /** @@ -623,11 +622,9 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] } | undefined} */ register(originalSpecifier, parentURL, data, transferList) { - if (isMainThread) { - // Only the main thread has a Hooks instance with worker thread. All other Worker threads - // delegate their hooks to the HooksThread of the main thread. - return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); - } + // Only the main thread has a Hooks instance with worker thread. All other Worker threads + // delegate their hooks to the HooksThread of the main thread. + return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); } /** @@ -640,6 +637,9 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] }} */ resolve(originalSpecifier, parentURL, importAttributes) { + // const FS = require('fs'); + // const UTIL = require('util'); + // FS.writeFileSync(1, `resolve(${originalSpecifier}). ${Error().stack}\n`); return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes); } diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 766659d4464379..edb816797ab7bc 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -69,6 +69,7 @@ const { resourceLimits: resourceLimitsRaw, threadId, Worker: WorkerImpl, + hasHooksThread, kMaxYoungGenerationSizeMb, kMaxOldGenerationSizeMb, kCodeRangeSizeMb, @@ -562,6 +563,7 @@ module.exports = { isMainThread, SHARE_ENV, hooksPort: undefined, + hasHooksThread, resourceLimits: !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, setEnvironmentData, diff --git a/src/node_worker.cc b/src/node_worker.cc index bc43cb42934ba9..ccccff3b23b8cf 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -46,6 +46,7 @@ namespace node { namespace worker { constexpr double kMB = 1024 * 1024; +std::atomic_bool Worker::internalExists{false}; Worker::Worker(Environment* env, Local wrap, @@ -489,6 +490,8 @@ void Worker::New(const FunctionCallbackInfo& args) { if (is_internal->IsFalse()) { THROW_IF_INSUFFICIENT_PERMISSIONS( env, permission::PermissionScope::kWorkerThreads, ""); + } else { + internalExists = true; } Isolate* isolate = args.GetIsolate(); @@ -903,6 +906,10 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } +void Worker::HasHooksThreadAlready(const FunctionCallbackInfo& args) { + args.GetReturnValue().Set(Worker::internalExists); +} + namespace { // Return the MessagePort that is global for this Environment and communicates @@ -940,6 +947,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data, SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime); SetConstructorFunction(isolate, target, "Worker", w); + SetMethodNoSideEffect(isolate, target, "hasHooksThread", Worker::HasHooksThread); } { @@ -1011,6 +1019,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); + registry->Register(Worker::HasHooksThreadAlready); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index 07fd7b460654e1..97f38443a43b74 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -5,6 +5,7 @@ #include #include +#include #include "node_exit_code.h" #include "node_messaging.h" #include "uv.h" @@ -76,6 +77,7 @@ class Worker : public AsyncWrap { static void TakeHeapSnapshot(const v8::FunctionCallbackInfo& args); static void LoopIdleTime(const v8::FunctionCallbackInfo& args); static void LoopStartTime(const v8::FunctionCallbackInfo& args); + static void HasHooksThreadAlready(const v8::FunctionCallbackInfo& args); private: bool CreateEnvMessagePort(Environment* env); @@ -102,6 +104,7 @@ class Worker : public AsyncWrap { uintptr_t stack_base_ = 0; // Optional name used for debugging in inspector and trace events. std::string name_; + static std::atomic_bool internalExists; // Custom resource constraints: double resource_limits_[kTotalResourceLimitCount]; diff --git a/test/es-module/test-esm-loader-hooks.mjs b/test/es-module/test-esm-loader-hooks.mjs index 80dbd885e25819..c481be30e5cd4b 100644 --- a/test/es-module/test-esm-loader-hooks.mjs +++ b/test/es-module/test-esm-loader-hooks.mjs @@ -612,7 +612,7 @@ describe('Loader hooks', { concurrency: !process.env.TEST_PARALLEL }, () => { ]); assert.strictEqual(stderr, ''); - assert.deepStrictEqual(stdout.split('\n'), ['resolve passthru', 'resolve passthru', '']); + assert.deepStrictEqual(stdout.split('\n'), ['resolve passthru', '']); assert.strictEqual(code, 0); assert.strictEqual(signal, null); }); diff --git a/test/es-module/test-esm-loader-programmatically.mjs b/test/es-module/test-esm-loader-programmatically.mjs index e117ff454d7889..396321b1606363 100644 --- a/test/es-module/test-esm-loader-programmatically.mjs +++ b/test/es-module/test-esm-loader-programmatically.mjs @@ -163,12 +163,10 @@ describe('ESM: programmatically register loaders', { concurrency: !process.env.T const lines = stdout.split('\n'); assert.match(lines[0], /resolve passthru/); - assert.match(lines[1], /resolve passthru/); - assert.match(lines[2], /load passthru/); - assert.match(lines[3], /load passthru/); - assert.match(lines[4], /Hello from dynamic import/); + assert.match(lines[1], /load passthru/); + assert.match(lines[2], /Hello from dynamic import/); - assert.strictEqual(lines[5], ''); + assert.strictEqual(lines[3], ''); }); it('works registering loaders as package name', async () => { diff --git a/test/es-module/test-esm-loader-threads.mjs b/test/es-module/test-esm-loader-threads.mjs index 7310a9ac5b54ac..ee5e57d7867a1e 100644 --- a/test/es-module/test-esm-loader-threads.mjs +++ b/test/es-module/test-esm-loader-threads.mjs @@ -16,6 +16,9 @@ describe('off-thread hooks', { concurrency: true }, () => { fixtures.path('es-module-loaders/workers-spawned.mjs'), ]); + console.log(stderr); + console.log(stdout); + strictEqual(stderr, ''); strictEqual(stdout.split('\n').filter((line) => line.startsWith('initialize')).length, 1); strictEqual(stdout.split('\n').filter((line) => line === 'foo').length, 2); @@ -30,7 +33,9 @@ describe('off-thread hooks', { concurrency: true }, () => { // 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs // =========================== // 16 calls to resolve + 16 calls to load hook for the registered custom loader - strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 16); + // 6 additional calls to resolve because of the modeul.register being allowed from worker threads (happens + // implicitly because of the --import on the main thread) + strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 22); strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked load')).length, 16); strictEqual(code, 0); strictEqual(signal, null); diff --git a/test/fixtures/es-module-loaders/hooks-log.mjs b/test/fixtures/es-module-loaders/hooks-log.mjs index 2d2512281e8bd5..416bbad17fd6ac 100644 --- a/test/fixtures/es-module-loaders/hooks-log.mjs +++ b/test/fixtures/es-module-loaders/hooks-log.mjs @@ -9,7 +9,7 @@ export function initialize() { } export function resolve(specifier, context, next) { - writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier}\n`); + writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier} \n`); return next(specifier, context); } From f715679bb46bef590bf0ea223f60c9c134e2628a Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 15:36:18 +0200 Subject: [PATCH 03/12] reenable tests on workers --- test/es-module/test-esm-loader-mock.mjs | 7 +------ test/es-module/test-esm-named-exports.js | 3 +-- test/es-module/test-esm-named-exports.mjs | 9 ++++----- .../es-module-loaders/builtin-named-exports.mjs | 13 +++++-------- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/test/es-module/test-esm-loader-mock.mjs b/test/es-module/test-esm-loader-mock.mjs index 0d39f549581a54..164d0ac3775039 100644 --- a/test/es-module/test-esm-loader-mock.mjs +++ b/test/es-module/test-esm-loader-mock.mjs @@ -1,11 +1,6 @@ -import { skipIfWorker } from '../common/index.mjs'; +import '../common/index.mjs'; import assert from 'node:assert/strict'; import { mock } from '../fixtures/es-module-loaders/mock.mjs'; -// Importing mock.mjs above will call `register` to modify the loaders chain. -// Modifying the loader chain is not supported currently when running from a worker thread. -// Relevant PR: https://github.com/nodejs/node/pull/52706 -// See comment: https://github.com/nodejs/node/pull/52706/files#r1585144580 -skipIfWorker(); mock('node:events', { EventEmitter: 'This is mocked!' diff --git a/test/es-module/test-esm-named-exports.js b/test/es-module/test-esm-named-exports.js index 00b7aebbfd1f46..25ce2f794c1165 100644 --- a/test/es-module/test-esm-named-exports.js +++ b/test/es-module/test-esm-named-exports.js @@ -1,8 +1,7 @@ // Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs 'use strict'; -const common = require('../common'); -common.skipIfWorker(); +require('../common'); const { readFile, __fromLoader } = require('fs'); const assert = require('assert'); diff --git a/test/es-module/test-esm-named-exports.mjs b/test/es-module/test-esm-named-exports.mjs index 6e584b05aa204f..bbe9c96b92d9b8 100644 --- a/test/es-module/test-esm-named-exports.mjs +++ b/test/es-module/test-esm-named-exports.mjs @@ -1,10 +1,9 @@ // Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs -import { skipIfWorker } from '../common/index.mjs'; -import * as fs from 'fs'; +import '../common/index.mjs'; +import { readFile, __fromLoader } from 'fs'; import assert from 'assert'; import ok from '../fixtures/es-modules/test-esm-ok.mjs'; -skipIfWorker(); assert(ok); -assert(fs.readFile); -assert(fs.__fromLoader); +assert(readFile); +assert(__fromLoader); diff --git a/test/fixtures/es-module-loaders/builtin-named-exports.mjs b/test/fixtures/es-module-loaders/builtin-named-exports.mjs index 4e22f631eba416..123b12c26bf0c9 100644 --- a/test/fixtures/es-module-loaders/builtin-named-exports.mjs +++ b/test/fixtures/es-module-loaders/builtin-named-exports.mjs @@ -1,4 +1,3 @@ -import { isMainThread } from '../../common/index.mjs'; import * as fixtures from '../../common/fixtures.mjs'; import { createRequire, register } from 'node:module'; @@ -11,10 +10,8 @@ Object.defineProperty(globalThis, GET_BUILTIN, { configurable: false, }); -if (isMainThread) { - register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), { - data: { - GET_BUILTIN, - }, - }); -} +register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), { + data: { + GET_BUILTIN, + }, +}); From 34bc39847aec7c35dad3732514dada95bfd17b0c Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 15:55:51 +0200 Subject: [PATCH 04/12] fix unfinished rename --- src/node_worker.cc | 4 ++-- src/node_worker.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/node_worker.cc b/src/node_worker.cc index ccccff3b23b8cf..5b5f28c4999c56 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -906,7 +906,7 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } -void Worker::HasHooksThreadAlready(const FunctionCallbackInfo& args) { +void Worker::HasHooksThread(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(Worker::internalExists); } @@ -1019,7 +1019,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); - registry->Register(Worker::HasHooksThreadAlready); + registry->Register(Worker::HasHooksThread); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index 97f38443a43b74..1bb5e43a9e6226 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -77,7 +77,7 @@ class Worker : public AsyncWrap { static void TakeHeapSnapshot(const v8::FunctionCallbackInfo& args); static void LoopIdleTime(const v8::FunctionCallbackInfo& args); static void LoopStartTime(const v8::FunctionCallbackInfo& args); - static void HasHooksThreadAlready(const v8::FunctionCallbackInfo& args); + static void HasHooksThread(const v8::FunctionCallbackInfo& args); private: bool CreateEnvMessagePort(Environment* env); From a33afea0baec61b3e14a81996e41c71522eab5d1 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 16:37:09 +0200 Subject: [PATCH 05/12] fixup! reenable tests on workers --- test/fixtures/es-module-loaders/builtin-named-exports.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures/es-module-loaders/builtin-named-exports.mjs b/test/fixtures/es-module-loaders/builtin-named-exports.mjs index 123b12c26bf0c9..32c29327ea6437 100644 --- a/test/fixtures/es-module-loaders/builtin-named-exports.mjs +++ b/test/fixtures/es-module-loaders/builtin-named-exports.mjs @@ -3,7 +3,7 @@ import { createRequire, register } from 'node:module'; const require = createRequire(import.meta.url); -const GET_BUILTIN = `$__get_builtin_hole_${Date.now()}`; +const GET_BUILTIN = `$__get_builtin_hole_${~~(Date.now()/10000)}`; Object.defineProperty(globalThis, GET_BUILTIN, { value: builtinName => require(builtinName), enumerable: false, From 332a22dd63d08bb4dc68b0e21e3ca96e004120c0 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 16:50:51 +0200 Subject: [PATCH 06/12] fixup! module: allow module.register from workers --- lib/internal/modules/esm/hooks.js | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index 4086c5451dc4a2..27b0d26ef0e713 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -5,7 +5,7 @@ const { ArrayPrototypeMap, ArrayPrototypePush, ArrayPrototypePushApply, - ArrayPrototypeReduce, + ArrayPrototypeSome, AtomicsLoad, AtomicsWait, AtomicsWaitAsync, @@ -167,13 +167,11 @@ class Hooks { * @returns {any | Promise} User data, ignored unless it's a promise, in which case it will be awaited. */ addCustomLoader(url, exports, data) { - const alreadyKnown = ArrayPrototypeReduce( - ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => { - if (this.#chains[hookName]) { - const res2 = ArrayPrototypeFilter(this.#chains[hookName], (el) => el.url === url); - return res2.length; - } - }), (acc, val) => acc || (val === 1), false); + const alreadyKnown = ArrayPrototypeSome(ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => { + if (this.#chains[hookName]) { + return ArrayPrototypeFilter(this.#chains[hookName], (el) => el.url === url).length === 1; + } + }), (el) => el); if (alreadyKnown) { return undefined; From c6213949b16b1f7de3337d7886fb1a052b6cc651 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 16:52:12 +0200 Subject: [PATCH 07/12] remove debugging output code --- lib/internal/modules/esm/loader.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index f38d2f5b0824c8..8583f1415e794f 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -637,9 +637,6 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] }} */ resolve(originalSpecifier, parentURL, importAttributes) { - // const FS = require('fs'); - // const UTIL = require('util'); - // FS.writeFileSync(1, `resolve(${originalSpecifier}). ${Error().stack}\n`); return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes); } From 89b8d6be0b71d90b48afc2c34e00babd9b382148 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Wed, 29 May 2024 16:57:06 +0200 Subject: [PATCH 08/12] lint --- src/node_worker.cc | 3 ++- src/node_worker.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/node_worker.cc b/src/node_worker.cc index 5b5f28c4999c56..1a812b44936398 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -947,7 +947,8 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data, SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime); SetConstructorFunction(isolate, target, "Worker", w); - SetMethodNoSideEffect(isolate, target, "hasHooksThread", Worker::HasHooksThread); + SetMethodNoSideEffect( + isolate, target, "hasHooksThread", Worker::HasHooksThread); } { diff --git a/src/node_worker.h b/src/node_worker.h index 1bb5e43a9e6226..915940dcaf1df5 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -3,9 +3,9 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#include #include #include -#include #include "node_exit_code.h" #include "node_messaging.h" #include "uv.h" From 8fe957eb08010abc2b76ff2d78d1ca6c94b835cb Mon Sep 17 00:00:00 2001 From: Gabriel Bota <94833492+dygabo@users.noreply.github.com> Date: Wed, 29 May 2024 17:39:45 +0200 Subject: [PATCH 09/12] Apply suggestions from code review Co-authored-by: Geoffrey Booth --- test/es-module/test-esm-loader-threads.mjs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/es-module/test-esm-loader-threads.mjs b/test/es-module/test-esm-loader-threads.mjs index ee5e57d7867a1e..9bd90e187b98ab 100644 --- a/test/es-module/test-esm-loader-threads.mjs +++ b/test/es-module/test-esm-loader-threads.mjs @@ -16,9 +16,6 @@ describe('off-thread hooks', { concurrency: true }, () => { fixtures.path('es-module-loaders/workers-spawned.mjs'), ]); - console.log(stderr); - console.log(stdout); - strictEqual(stderr, ''); strictEqual(stdout.split('\n').filter((line) => line.startsWith('initialize')).length, 1); strictEqual(stdout.split('\n').filter((line) => line === 'foo').length, 2); @@ -33,7 +30,7 @@ describe('off-thread hooks', { concurrency: true }, () => { // 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs // =========================== // 16 calls to resolve + 16 calls to load hook for the registered custom loader - // 6 additional calls to resolve because of the modeul.register being allowed from worker threads (happens + // 6 additional calls to resolve because of the module.register being allowed from worker threads (happens // implicitly because of the --import on the main thread) strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 22); strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked load')).length, 16); From 75dc437628ac80f8447d8f742fa1f0704d198fab Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Thu, 30 May 2024 13:22:19 +0200 Subject: [PATCH 10/12] make worker instantiation thread safe + some other review findings --- lib/internal/modules/esm/hooks.js | 26 +++++++++++++++++--------- lib/internal/modules/esm/loader.js | 2 -- src/node_errors.h | 1 + src/node_worker.cc | 15 +++++++++++++-- src/node_worker.h | 2 ++ 5 files changed, 33 insertions(+), 13 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index 27b0d26ef0e713..e6ceed7e137d60 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -515,12 +515,15 @@ class HooksProxy { #isWorkerOwner = false; constructor() { - const { InternalWorker, hooksPort, hasHooksThread } = require('internal/worker'); + const { InternalWorker, hooksPort } = require('internal/worker'); const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH); this.#lock = new Int32Array(lock); - if (!hasHooksThread()) { - // Main thread is the only one that creates the internal single hooks worker + try { + // The customization hooks thread is only created once. All other threads reuse + // the existing instance. If another thread created it, the constructor will throw + // Fallback is to use the existing hooksPort created by the thread that originally + // spawned the customization hooks thread. this.#worker = new InternalWorker(loaderWorkerId, { stderr: false, stdin: false, @@ -534,16 +537,21 @@ class HooksProxy { this.#worker.on('exit', process.exit); this.#isWorkerOwner = true; this.#portToHooksThread = this.#worker; - } else { - this.#portToHooksThread = hooksPort; + } catch (e) { + if (e.code === 'ERR_HOOKS_THREAD_EXISTS') { + this.#portToHooksThread = hooksPort; + } else { + throw e; + } } } waitForWorker() { - // There is one Hooks instance for each worker thread. But only one of these Hooks instances - // has an InternalWorker. That was the Hooks instance created for the main thread. - // It means for all Hooks instances that are not on the main thread => they are ready because they - // delegate to the single InternalWorker anyway. + // There is one Hooks instance for each worker thread. But only one of + // these Hooks instances has an InternalWorker. That was the Hooks instance + // created for the first thread that registers a hook set. + // It means for all Hooks instances that are not on that thread => they are + // done because they delegate to the single InternalWorker. if (!this.#isWorkerOwner) { return; } diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index 8583f1415e794f..1724ae663b2008 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -622,8 +622,6 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] } | undefined} */ register(originalSpecifier, parentURL, data, transferList) { - // Only the main thread has a Hooks instance with worker thread. All other Worker threads - // delegate their hooks to the HooksThread of the main thread. return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); } diff --git a/src/node_errors.h b/src/node_errors.h index 0a74373cf5d333..9aeb154f7cd8dc 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -70,6 +70,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details); V(ERR_DLOPEN_FAILED, Error) \ V(ERR_ENCODING_INVALID_ENCODED_DATA, TypeError) \ V(ERR_EXECUTION_ENVIRONMENT_NOT_AVAILABLE, Error) \ + V(ERR_HOOKS_THREAD_EXISTS, Error) \ V(ERR_ILLEGAL_CONSTRUCTOR, Error) \ V(ERR_INVALID_ADDRESS, Error) \ V(ERR_INVALID_ARG_VALUE, TypeError) \ diff --git a/src/node_worker.cc b/src/node_worker.cc index 1a812b44936398..87107867b791e0 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -47,6 +47,7 @@ namespace worker { constexpr double kMB = 1024 * 1024; std::atomic_bool Worker::internalExists{false}; +Mutex Worker::instantiationMutex; Worker::Worker(Environment* env, Local wrap, @@ -484,18 +485,28 @@ Worker::~Worker() { } void Worker::New(const FunctionCallbackInfo& args) { + Mutex::ScopedLock lock(instantiationMutex); Environment* env = Environment::GetCurrent(args); auto is_internal = args[5]; CHECK(is_internal->IsBoolean()); if (is_internal->IsFalse()) { THROW_IF_INSUFFICIENT_PERMISSIONS( env, permission::PermissionScope::kWorkerThreads, ""); - } else { - internalExists = true; } Isolate* isolate = args.GetIsolate(); CHECK(args.IsConstructCall()); + auto creatingHooksThread = is_internal->IsTrue(); + + if (creatingHooksThread && internalExists) { + isolate->ThrowException(ERR_HOOKS_THREAD_EXISTS( + isolate, "Customization hooks thread already exists")); + return; + } + + if (creatingHooksThread) { + internalExists = true; + } if (env->isolate_data()->platform() == nullptr) { THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env); diff --git a/src/node_worker.h b/src/node_worker.h index 915940dcaf1df5..51cefa42972411 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -105,6 +105,8 @@ class Worker : public AsyncWrap { // Optional name used for debugging in inspector and trace events. std::string name_; static std::atomic_bool internalExists; + // this mutex is to synchronize ::New calls + static Mutex instantiationMutex; // Custom resource constraints: double resource_limits_[kTotalResourceLimitCount]; From f94be0a13987b53e973e76e12030ba4f6921c32c Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Fri, 31 May 2024 14:51:34 +0200 Subject: [PATCH 11/12] ongoing work for hook ownership --- lib/internal/modules/esm/hooks.js | 9 ++++++--- lib/internal/modules/esm/loader.js | 7 ++++--- test/fixtures/es-module-loaders/hooks-input.mjs | 1 + 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index e6ceed7e137d60..05dfed0b7793b2 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -169,7 +169,8 @@ class Hooks { addCustomLoader(url, exports, data) { const alreadyKnown = ArrayPrototypeSome(ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => { if (this.#chains[hookName]) { - return ArrayPrototypeFilter(this.#chains[hookName], (el) => el.url === url).length === 1; + return ArrayPrototypeFilter( + this.#chains[hookName], (el) => el.url === url && el.data === data).length === 1; } }), (el) => el); @@ -185,11 +186,11 @@ class Hooks { if (resolve) { const next = this.#chains.resolve[this.#chains.resolve.length - 1]; - ArrayPrototypePush(this.#chains.resolve, { __proto__: null, fn: resolve, url, next }); + ArrayPrototypePush(this.#chains.resolve, { __proto__: null, fn: resolve, url, data, next }); } if (load) { const next = this.#chains.load[this.#chains.load.length - 1]; - ArrayPrototypePush(this.#chains.load, { __proto__: null, fn: load, url, next }); + ArrayPrototypePush(this.#chains.load, { __proto__: null, fn: load, url, data, next }); } return initialize?.(data); } @@ -211,6 +212,7 @@ class Hooks { originalSpecifier, parentURL, importAttributes = { __proto__: null }, + threadId ) { throwIfInvalidParentURL(parentURL); @@ -219,6 +221,7 @@ class Hooks { conditions: getDefaultConditions(), importAttributes, parentURL, + threadId, }; const meta = { chainFinished: null, diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index 1724ae663b2008..bd52ccab95feb8 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap'); const { urlToFilename, } = require('internal/modules/helpers'); +const { threadId } = require('worker_threads'); let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer; /** @@ -500,7 +501,7 @@ class ModuleLoader { */ resolve(originalSpecifier, parentURL, importAttributes) { if (this.#customizations) { - return this.#customizations.resolve(originalSpecifier, parentURL, importAttributes); + return this.#customizations.resolve(originalSpecifier, parentURL, importAttributes, threadId); } const requestKey = this.#resolveCache.serializeKey(originalSpecifier, importAttributes); const cachedResult = this.#resolveCache.get(requestKey, parentURL); @@ -622,7 +623,7 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] } | undefined} */ register(originalSpecifier, parentURL, data, transferList) { - return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); + return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data, threadId); } /** @@ -635,7 +636,7 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] }} */ resolve(originalSpecifier, parentURL, importAttributes) { - return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes); + return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes, threadId); } resolveSync(originalSpecifier, parentURL, importAttributes) { diff --git a/test/fixtures/es-module-loaders/hooks-input.mjs b/test/fixtures/es-module-loaders/hooks-input.mjs index 1d3759f458224e..81b75d41dfda87 100644 --- a/test/fixtures/es-module-loaders/hooks-input.mjs +++ b/test/fixtures/es-module-loaders/hooks-input.mjs @@ -37,6 +37,7 @@ export async function resolve(specifier, context, next) { 'conditions', 'importAttributes', 'parentURL', + 'threadId', ]); assert.ok(Array.isArray(context.conditions)); assert.strictEqual(typeof next, 'function'); From d9169d48eb84afe50ae086973210e3c12fb61460 Mon Sep 17 00:00:00 2001 From: Gabriel Bota Date: Fri, 31 May 2024 15:04:42 +0200 Subject: [PATCH 12/12] rename to --- lib/internal/modules/esm/hooks.js | 10 +++++----- lib/internal/worker.js | 8 ++++---- src/node_worker.cc | 8 ++++---- src/node_worker.h | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index 05dfed0b7793b2..7add585f33192f 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -494,7 +494,7 @@ class HooksProxy { */ #lock; /** - * The InternalWorker instance, which lets us communicate with the loader thread. + * The HooksWorker instance, which lets us communicate with the loader thread. */ #worker; @@ -518,7 +518,7 @@ class HooksProxy { #isWorkerOwner = false; constructor() { - const { InternalWorker, hooksPort } = require('internal/worker'); + const { HooksWorker, hooksPort } = require('internal/worker'); const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH); this.#lock = new Int32Array(lock); @@ -527,7 +527,7 @@ class HooksProxy { // the existing instance. If another thread created it, the constructor will throw // Fallback is to use the existing hooksPort created by the thread that originally // spawned the customization hooks thread. - this.#worker = new InternalWorker(loaderWorkerId, { + this.#worker = new HooksWorker(loaderWorkerId, { stderr: false, stdin: false, stdout: false, @@ -551,10 +551,10 @@ class HooksProxy { waitForWorker() { // There is one Hooks instance for each worker thread. But only one of - // these Hooks instances has an InternalWorker. That was the Hooks instance + // these Hooks instances has a HooksWorker. That was the Hooks instance // created for the first thread that registers a hook set. // It means for all Hooks instances that are not on that thread => they are - // done because they delegate to the single InternalWorker. + // done because they delegate to the single HooksWorker. if (!this.#isWorkerOwner) { return; } diff --git a/lib/internal/worker.js b/lib/internal/worker.js index edb816797ab7bc..66cd0816096f2b 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -482,10 +482,10 @@ class Worker extends EventEmitter { } /** - * A worker which has an internal module for entry point (e.g. internal/module/esm/worker). - * Internal workers bypass the permission model. + * A worker which is used to by the customization hooks thread (internal/module/esm/worker). + * This bypasses the permission model. */ -class InternalWorker extends Worker { +class HooksWorker extends Worker { constructor(filename, options) { super(filename, options, kIsInternal); } @@ -570,6 +570,6 @@ module.exports = { getEnvironmentData, assignEnvironmentData, threadId, - InternalWorker, + HooksWorker, Worker, }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 87107867b791e0..9c383802ed1ac6 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -46,7 +46,7 @@ namespace node { namespace worker { constexpr double kMB = 1024 * 1024; -std::atomic_bool Worker::internalExists{false}; +std::atomic_bool Worker::hooksWorkerExists{false}; Mutex Worker::instantiationMutex; Worker::Worker(Environment* env, @@ -498,14 +498,14 @@ void Worker::New(const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); auto creatingHooksThread = is_internal->IsTrue(); - if (creatingHooksThread && internalExists) { + if (creatingHooksThread && hooksWorkerExists) { isolate->ThrowException(ERR_HOOKS_THREAD_EXISTS( isolate, "Customization hooks thread already exists")); return; } if (creatingHooksThread) { - internalExists = true; + hooksWorkerExists = true; } if (env->isolate_data()->platform() == nullptr) { @@ -918,7 +918,7 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { } void Worker::HasHooksThread(const FunctionCallbackInfo& args) { - args.GetReturnValue().Set(Worker::internalExists); + args.GetReturnValue().Set(Worker::hooksWorkerExists); } namespace { diff --git a/src/node_worker.h b/src/node_worker.h index 51cefa42972411..e56cffa8808b1d 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -104,7 +104,7 @@ class Worker : public AsyncWrap { uintptr_t stack_base_ = 0; // Optional name used for debugging in inspector and trace events. std::string name_; - static std::atomic_bool internalExists; + static std::atomic_bool hooksWorkerExists; // this mutex is to synchronize ::New calls static Mutex instantiationMutex;