Skip to content

Commit

Permalink
fix(core): allow configuring plugin message timeout (#27315)
Browse files Browse the repository at this point in the history
<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
Plugin messages timeout if they don't hear back in 5 minutes. This adds
timeouts to createNodes and other plugin APIs invoked during graph
construction, which didn't previously exist

## Expected Behavior
Increased timeout + easy configuration via env var

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
  • Loading branch information
AgentEnder committed Aug 7, 2024
1 parent dfd7241 commit 0f193e2
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/shared/reference/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The following environment variables are ones that you can set to change the beha
| NX_MIGRATE_CLI_VERSION | string | The version of Nx to use for running the `nx migrate` command. If not set, it defaults to `latest`. |
| NX_LOAD_DOT_ENV_FILES | boolean | If set to 'false', Nx will not load any environment files (e.g. `.local.env`, `.env.local`) |
| NX_NATIVE_FILE_CACHE_DIRECTORY | string | The cache for native `.node` files is stored under a global temp directory by default. Set this variable to use a different directory. This is interpreted as an absolute path. |
| NX_PLUGIN_NO_TIMEOUTS | boolean | If set to `true`, plugin operations will not timeout |

Nx will set the following environment variables so they can be accessible within the process even outside of executors and generators.

Expand Down
124 changes: 90 additions & 34 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ const cleanupFunctions = new Set<() => void>();

const pluginNames = new Map<ChildProcess, string>();

const MAX_MESSAGE_WAIT = 1000 * 60 * 5; // 5 minutes
const PLUGIN_TIMEOUT_HINT_TEXT =
'As a last resort, you can set NX_PLUGIN_NO_TIMEOUTS=true to bypass this timeout.';

const MINUTES = 10;

const MAX_MESSAGE_WAIT =
process.env.NX_PLUGIN_NO_TIMEOUTS === 'true'
? undefined
: 1000 * 60 * MINUTES; // 10 minutes

interface PendingPromise {
promise: Promise<unknown>;
Expand Down Expand Up @@ -67,9 +75,15 @@ export async function loadRemoteNxPlugin(
});
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);

const loadTimeout = setTimeout(() => {
rej(new Error('Plugin worker timed out when loading plugin:' + plugin));
}, MAX_MESSAGE_WAIT);
const loadTimeout = MAX_MESSAGE_WAIT
? setTimeout(() => {
rej(
new Error(
`Loading "${plugin}" timed out after ${MINUTES} minutes. ${PLUGIN_TIMEOUT_HINT_TEXT}`
)
);
}, MAX_MESSAGE_WAIT)
: undefined;

socket.on(
'data',
Expand All @@ -78,7 +92,7 @@ export async function loadRemoteNxPlugin(
worker,
pendingPromises,
(val) => {
clearTimeout(loadTimeout);
if (loadTimeout) clearTimeout(loadTimeout);
res(val);
},
rej,
Expand Down Expand Up @@ -144,49 +158,81 @@ function createWorkerHandler(
(configFiles, ctx) => {
const tx =
pluginName + worker.pid + ':createNodes:' + txId++;
return registerPendingPromise(tx, pending, () => {
sendMessageOverSocket(socket, {
type: 'createNodes',
payload: { configFiles, context: ctx, tx },
});
});
return registerPendingPromise(
tx,
pending,
() => {
sendMessageOverSocket(socket, {
type: 'createNodes',
payload: { configFiles, context: ctx, tx },
});
},
{
plugin: pluginName,
operation: 'createNodes',
}
);
},
]
: undefined,
createDependencies: result.hasCreateDependencies
? (ctx) => {
const tx =
pluginName + worker.pid + ':createDependencies:' + txId++;
return registerPendingPromise(tx, pending, () => {
sendMessageOverSocket(socket, {
type: 'createDependencies',
payload: { context: ctx, tx },
});
});
return registerPendingPromise(
tx,
pending,
() => {
sendMessageOverSocket(socket, {
type: 'createDependencies',
payload: { context: ctx, tx },
});
},
{
plugin: pluginName,
operation: 'createDependencies',
}
);
}
: undefined,
processProjectGraph: result.hasProcessProjectGraph
? (graph, ctx) => {
const tx =
pluginName + worker.pid + ':processProjectGraph:' + txId++;
return registerPendingPromise(tx, pending, () => {
sendMessageOverSocket(socket, {
type: 'processProjectGraph',
payload: { graph, ctx, tx },
});
});
return registerPendingPromise(
tx,
pending,
() => {
sendMessageOverSocket(socket, {
type: 'processProjectGraph',
payload: { graph, ctx, tx },
});
},
{
operation: 'processProjectGraph',
plugin: pluginName,
}
);
}
: undefined,
createMetadata: result.hasCreateMetadata
? (graph, ctx) => {
const tx =
pluginName + worker.pid + ':createMetadata:' + txId++;
return registerPendingPromise(tx, pending, () => {
sendMessageOverSocket(socket, {
type: 'createMetadata',
payload: { graph, context: ctx, tx },
});
});
return registerPendingPromise(
tx,
pending,
() => {
sendMessageOverSocket(socket, {
type: 'createMetadata',
payload: { graph, context: ctx, tx },
});
},
{
plugin: pluginName,
operation: 'createMetadata',
}
);
}
: undefined,
});
Expand Down Expand Up @@ -265,22 +311,32 @@ process.on('SIGTERM', exitHandler);
function registerPendingPromise(
tx: string,
pending: Map<string, PendingPromise>,
callback: () => void
callback: () => void,
context: {
plugin: string;
operation: string;
}
): Promise<any> {
let resolver, rejector, timeout;

const promise = new Promise((res, rej) => {
rejector = rej;
resolver = res;

timeout = setTimeout(() => {
rej(new Error(`Plugin worker timed out when processing message ${tx}`));
}, MAX_MESSAGE_WAIT);
timeout = MAX_MESSAGE_WAIT
? setTimeout(() => {
rej(
new Error(
`${context.plugin} timed out after ${MINUTES} minutes during ${context.operation}. ${PLUGIN_TIMEOUT_HINT_TEXT}`
)
);
}, MAX_MESSAGE_WAIT)
: undefined;

callback();
}).finally(() => {
pending.delete(tx);
clearTimeout(timeout);
if (timeout) clearTimeout(timeout);
});

pending.set(tx, {
Expand Down

0 comments on commit 0f193e2

Please sign in to comment.