diff --git a/docs/generated/devkit/TargetConfiguration.md b/docs/generated/devkit/TargetConfiguration.md index 05bb7fdac40d6..1760db6290783 100644 --- a/docs/generated/devkit/TargetConfiguration.md +++ b/docs/generated/devkit/TargetConfiguration.md @@ -15,6 +15,7 @@ Target's configuration - [cache](../../devkit/documents/TargetConfiguration#cache): boolean - [command](../../devkit/documents/TargetConfiguration#command): string - [configurations](../../devkit/documents/TargetConfiguration#configurations): Object +- [continuous](../../devkit/documents/TargetConfiguration#continuous): boolean - [defaultConfiguration](../../devkit/documents/TargetConfiguration#defaultconfiguration): string - [dependsOn](../../devkit/documents/TargetConfiguration#dependson): (string | TargetDependencyConfig)[] - [executor](../../devkit/documents/TargetConfiguration#executor): string @@ -55,6 +56,14 @@ Sets of options --- +### continuous + +• `Optional` **continuous**: `boolean` + +Whether this target runs continuously + +--- + ### defaultConfiguration • `Optional` **defaultConfiguration**: `string` diff --git a/docs/generated/devkit/Task.md b/docs/generated/devkit/Task.md index 10f6d600f80f1..78fa965f4147b 100644 --- a/docs/generated/devkit/Task.md +++ b/docs/generated/devkit/Task.md @@ -7,6 +7,7 @@ A representation of the invocation of an Executor ### Properties - [cache](../../devkit/documents/Task#cache): boolean +- [continuous](../../devkit/documents/Task#continuous): boolean - [endTime](../../devkit/documents/Task#endtime): number - [hash](../../devkit/documents/Task#hash): string - [hashDetails](../../devkit/documents/Task#hashdetails): Object @@ -28,6 +29,14 @@ Determines if a given task should be cacheable. --- +### continuous + +• `Optional` **continuous**: `boolean` + +This denotes if the task runs continuously + +--- + ### endTime • `Optional` **endTime**: `number` diff --git a/docs/generated/devkit/TaskGraph.md b/docs/generated/devkit/TaskGraph.md index 35def5738bb1c..3a40554e162ef 100644 --- a/docs/generated/devkit/TaskGraph.md +++ b/docs/generated/devkit/TaskGraph.md @@ -6,12 +6,19 @@ Graph of Tasks to be executed ### Properties +- [continuousDependencies](../../devkit/documents/TaskGraph#continuousdependencies): Record - [dependencies](../../devkit/documents/TaskGraph#dependencies): Record - [roots](../../devkit/documents/TaskGraph#roots): string[] - [tasks](../../devkit/documents/TaskGraph#tasks): Record ## Properties +### continuousDependencies + +• **continuousDependencies**: `Record`\<`string`, `string`[]\> + +--- + ### dependencies • **dependencies**: `Record`\<`string`, `string`[]\> diff --git a/packages/js/src/utils/buildable-libs-utils.spec.ts b/packages/js/src/utils/buildable-libs-utils.spec.ts index f9cb121e31be4..0ef263106698b 100644 --- a/packages/js/src/utils/buildable-libs-utils.spec.ts +++ b/packages/js/src/utils/buildable-libs-utils.spec.ts @@ -394,6 +394,7 @@ describe('calculateDependenciesFromTaskGraph', () => { 'lib3:build': [], 'lib4:build': [], }, + continuousDependencies: {}, roots: [], tasks: { 'lib1:build': { @@ -402,6 +403,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build': { id: 'lib2:build', @@ -409,6 +411,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build-base': { id: 'lib2:build-base', @@ -416,6 +419,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build': { id: 'lib3:build', @@ -423,6 +427,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build': { id: 'lib4:build', @@ -430,6 +435,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, }, }; @@ -569,6 +575,7 @@ describe('calculateDependenciesFromTaskGraph', () => { 'lib4:build': ['lib4:build-base'], 'lib4:build-base': [], }, + continuousDependencies: {}, roots: [], tasks: { 'lib1:build': { @@ -577,6 +584,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib1:build-base': { id: 'lib1:build-base', @@ -584,6 +592,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib1', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build': { id: 'lib2:build', @@ -591,6 +600,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib2:build-base': { id: 'lib2:build-base', @@ -598,6 +608,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib2', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build': { id: 'lib3:build', @@ -605,6 +616,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib3:build-base': { id: 'lib3:build-base', @@ -612,6 +624,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib3', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build': { id: 'lib4:build', @@ -619,6 +632,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build' }, outputs: [], parallelism: true, + continuous: false, }, 'lib4:build-base': { id: 'lib4:build-base', @@ -626,6 +640,7 @@ describe('calculateDependenciesFromTaskGraph', () => { target: { project: 'lib4', target: 'build-base' }, outputs: [], parallelism: true, + continuous: false, }, }, }; @@ -717,6 +732,7 @@ describe('calculateDependenciesFromTaskGraph', () => { // not relevant for this test case const taskGraph: TaskGraph = { dependencies: {}, + continuousDependencies: {}, roots: [], tasks: {}, }; diff --git a/packages/nx/package.json b/packages/nx/package.json index 0703927b6c7a0..f3506a7ca4b07 100644 --- a/packages/nx/package.json +++ b/packages/nx/package.json @@ -66,6 +66,7 @@ "string-width": "^4.2.3", "tar-stream": "~2.2.0", "tmp": "~0.2.1", + "tree-kill": "^1.2.2", "tsconfig-paths": "^4.1.2", "tslib": "^2.3.0", "yaml": "^2.6.0", diff --git a/packages/nx/src/command-line/graph/graph.ts b/packages/nx/src/command-line/graph/graph.ts index e355d7d137682..5e172ab617cd6 100644 --- a/packages/nx/src/command-line/graph/graph.ts +++ b/packages/nx/src/command-line/graph/graph.ts @@ -980,6 +980,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): { taskGraphs[taskId] = { tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], }; @@ -1006,6 +1007,7 @@ function getAllTaskGraphsForWorkspace(projectGraph: ProjectGraph): { taskGraphs[taskId] = { tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], }; diff --git a/packages/nx/src/commands-runner/get-command-projects.ts b/packages/nx/src/commands-runner/get-command-projects.ts index 3d9c669e85fb5..aaacb863f504d 100644 --- a/packages/nx/src/commands-runner/get-command-projects.ts +++ b/packages/nx/src/commands-runner/get-command-projects.ts @@ -1,5 +1,4 @@ import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph'; -import { removeIdsFromGraph } from '../tasks-runner/utils'; import { NxArgs } from '../utils/command-line-utils'; import { CommandGraph } from './command-graph'; import { createCommandGraph } from './create-command-graph'; @@ -34,3 +33,35 @@ function getSortedProjects( return getSortedProjects(newGraph, sortedProjects); } + +function removeIdsFromGraph( + graph: { + roots: string[]; + dependencies: Record; + }, + ids: string[], + mapWithIds: Record +): { + mapWithIds: Record; + roots: string[]; + dependencies: Record; +} { + const filteredMapWithIds = {}; + const dependencies = {}; + const removedSet = new Set(ids); + for (let id of Object.keys(mapWithIds)) { + if (!removedSet.has(id)) { + filteredMapWithIds[id] = mapWithIds[id]; + dependencies[id] = graph.dependencies[id].filter( + (depId) => !removedSet.has(depId) + ); + } + } + return { + mapWithIds: filteredMapWithIds, + dependencies: dependencies, + roots: Object.keys(dependencies).filter( + (k) => dependencies[k].length === 0 + ), + }; +} diff --git a/packages/nx/src/config/task-graph.ts b/packages/nx/src/config/task-graph.ts index 9829af409e10f..e65f7d3f74b95 100644 --- a/packages/nx/src/config/task-graph.ts +++ b/packages/nx/src/config/task-graph.ts @@ -81,6 +81,11 @@ export interface Task { * Determines if a given task should be parallelizable. */ parallelism: boolean; + + /** + * This denotes if the task runs continuously + */ + continuous?: boolean; } /** @@ -99,4 +104,6 @@ export interface TaskGraph { * Map of Task IDs to IDs of tasks which the task depends on */ dependencies: Record; + + continuousDependencies: Record; } diff --git a/packages/nx/src/config/workspace-json-project-json.ts b/packages/nx/src/config/workspace-json-project-json.ts index 78c56cf71a814..b4bab33f94ec1 100644 --- a/packages/nx/src/config/workspace-json-project-json.ts +++ b/packages/nx/src/config/workspace-json-project-json.ts @@ -258,6 +258,11 @@ export interface TargetConfiguration { */ parallelism?: boolean; + /** + * Whether this target runs continuously + */ + continuous?: boolean; + /** * List of generators to run before the target to ensure the workspace * is up to date. diff --git a/packages/nx/src/executors/run-commands/run-commands.impl.ts b/packages/nx/src/executors/run-commands/run-commands.impl.ts index 4df466e46f1f4..98b22fa0404b1 100644 --- a/packages/nx/src/executors/run-commands/run-commands.impl.ts +++ b/packages/nx/src/executors/run-commands/run-commands.impl.ts @@ -1,54 +1,35 @@ -import { ChildProcess, exec, Serializable } from 'child_process'; -import * as path from 'path'; +import { Serializable } from 'child_process'; import * as yargsParser from 'yargs-parser'; -import { env as appendLocalEnv } from 'npm-run-path'; import { ExecutorContext } from '../../config/misc-interfaces'; -import * as chalk from 'chalk'; import { getPseudoTerminal, PseudoTerminal, - PseudoTtyProcess, } from '../../tasks-runner/pseudo-terminal'; import { signalToCode } from '../../utils/exit-codes'; -import { - loadAndExpandDotEnvFile, - unloadDotEnvFile, -} from '../../tasks-runner/task-env'; +import { ParallelRunningTasks, SeriallyRunningTasks } from './running-tasks'; export const LARGE_BUFFER = 1024 * 1000000; -let pseudoTerminal: PseudoTerminal | null; -const childProcesses = new Set(); - -function loadEnvVarsFile(path: string, env: Record = {}) { - unloadDotEnvFile(path, env); - const result = loadAndExpandDotEnvFile(path, env); - if (result.error) { - throw result.error; - } -} - export type Json = { [k: string]: any; }; +export interface RunCommandsCommandOptions { + command: string; + forwardAllArgs?: boolean; + /** + * description was added to allow users to document their commands inline, + * it is not intended to be used as part of the execution of the command. + */ + description?: string; + prefix?: string; + prefixColor?: string; + color?: string; + bgColor?: string; +} + export interface RunCommandsOptions extends Json { command?: string | string[]; - commands?: ( - | { - command: string; - forwardAllArgs?: boolean; - /** - * description was added to allow users to document their commands inline, - * it is not intended to be used as part of the execution of the command. - */ - description?: string; - prefix?: string; - prefixColor?: string; - color?: string; - bgColor?: string; - } - | string - )[]; + commands?: Array; color?: boolean; parallel?: boolean; readyWhen?: string | string[]; @@ -108,7 +89,18 @@ export default async function ( success: boolean; terminalOutput: string; }> { - registerProcessListener(); + const task = await runCommands(options, context); + const results = await task.getResults(); + return { + ...results, + success: results.code === 0, + }; +} + +export async function runCommands( + options: RunCommandsOptions, + context: ExecutorContext +) { const normalized = normalizeOptions(options); if (normalized.readyWhenStatus.length && !normalized.parallel) { @@ -128,11 +120,18 @@ export default async function ( ); } + const pseudoTerminal = + !options.parallel && PseudoTerminal.isSupported() + ? getPseudoTerminal() + : null; + try { - const result = options.parallel - ? await runInParallel(normalized, context) - : await runSerially(normalized, context); - return result; + const runningTask = options.parallel + ? new ParallelRunningTasks(normalized, context) + : new SeriallyRunningTasks(normalized, context, pseudoTerminal); + + registerProcessListener(runningTask, pseudoTerminal); + return runningTask; } catch (e) { if (process.env.NX_VERBOSE_LOGGING === 'true') { console.error(e); @@ -143,77 +142,6 @@ export default async function ( } } -async function runInParallel( - options: NormalizedRunCommandsOptions, - context: ExecutorContext -): Promise<{ success: boolean; terminalOutput: string }> { - const procs = options.commands.map((c) => - createProcess( - null, - c, - options.readyWhenStatus, - options.color, - calculateCwd(options.cwd, context), - options.env ?? {}, - true, - options.usePty, - options.streamOutput, - options.tty, - options.envFile - ).then((result: { success: boolean; terminalOutput: string }) => ({ - result, - command: c.command, - })) - ); - - let terminalOutput = ''; - if (options.readyWhenStatus.length) { - const r: { - result: { success: boolean; terminalOutput: string }; - command: string; - } = await Promise.race(procs); - terminalOutput += r.result.terminalOutput; - if (!r.result.success) { - const output = `Warning: command "${r.command}" exited with non-zero status code`; - terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { success: false, terminalOutput }; - } else { - return { success: true, terminalOutput }; - } - } else { - const r: { - result: { success: boolean; terminalOutput: string }; - command: string; - }[] = await Promise.all(procs); - terminalOutput += r.map((f) => f.result.terminalOutput).join(''); - const failed = r.filter((v) => !v.result.success); - if (failed.length > 0) { - const output = failed - .map( - (f) => - `Warning: command "${f.command}" exited with non-zero status code` - ) - .join('\r\n'); - terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { - success: false, - terminalOutput, - }; - } else { - return { - success: true, - terminalOutput, - }; - } - } -} - function normalizeOptions( options: RunCommandsOptions ): NormalizedRunCommandsOptions { @@ -279,241 +207,6 @@ function normalizeOptions( return options as NormalizedRunCommandsOptions; } -async function runSerially( - options: NormalizedRunCommandsOptions, - context: ExecutorContext -): Promise<{ success: boolean; terminalOutput: string }> { - pseudoTerminal ??= PseudoTerminal.isSupported() ? getPseudoTerminal() : null; - let terminalOutput = ''; - for (const c of options.commands) { - const result: { success: boolean; terminalOutput: string } = - await createProcess( - pseudoTerminal, - c, - [], - options.color, - calculateCwd(options.cwd, context), - options.processEnv ?? options.env ?? {}, - false, - options.usePty, - options.streamOutput, - options.tty, - options.envFile - ); - terminalOutput += result.terminalOutput; - if (!result.success) { - const output = `Warning: command "${c.command}" exited with non-zero status code`; - result.terminalOutput += output; - if (options.streamOutput) { - process.stderr.write(output); - } - return { success: false, terminalOutput }; - } - } - return { success: true, terminalOutput }; -} - -async function createProcess( - pseudoTerminal: PseudoTerminal | null, - commandConfig: { - command: string; - color?: string; - bgColor?: string; - prefix?: string; - prefixColor?: string; - }, - readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], - color: boolean, - cwd: string, - env: Record, - isParallel: boolean, - usePty: boolean = true, - streamOutput: boolean = true, - tty: boolean, - envFile?: string -): Promise<{ success: boolean; terminalOutput: string }> { - env = processEnv(color, cwd, env, envFile); - // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes - // currently does not work properly in windows - if ( - pseudoTerminal && - process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && - !commandConfig.prefix && - readyWhenStatus.length === 0 && - !isParallel && - usePty - ) { - let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; - if (streamOutput) { - process.stdout.write(terminalOutput); - } - - const cp = pseudoTerminal.runCommand(commandConfig.command, { - cwd, - jsEnv: env, - quiet: !streamOutput, - tty, - }); - - childProcesses.add(cp); - - return new Promise((res) => { - cp.onOutput((output) => { - terminalOutput += output; - }); - - cp.onExit((code) => { - if (code >= 128) { - process.exit(code); - } else { - res({ success: code === 0, terminalOutput }); - } - }); - }); - } - - return nodeProcess(commandConfig, cwd, env, readyWhenStatus, streamOutput); -} - -function nodeProcess( - commandConfig: { - command: string; - color?: string; - bgColor?: string; - prefix?: string; - prefixColor?: string; - }, - cwd: string, - env: Record, - readyWhenStatus: { stringToMatch: string; found: boolean }[], - streamOutput = true -): Promise<{ success: boolean; terminalOutput: string }> { - let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; - if (streamOutput) { - process.stdout.write(terminalOutput); - } - return new Promise((res) => { - const childProcess = exec(commandConfig.command, { - maxBuffer: LARGE_BUFFER, - env, - cwd, - windowsHide: false, - }); - - childProcesses.add(childProcess); - - childProcess.stdout.on('data', (data) => { - const output = addColorAndPrefix(data, commandConfig); - terminalOutput += output; - if (streamOutput) { - process.stdout.write(output); - } - if (readyWhenStatus.length && isReady(readyWhenStatus, data.toString())) { - res({ success: true, terminalOutput }); - } - }); - childProcess.stderr.on('data', (err) => { - const output = addColorAndPrefix(err, commandConfig); - terminalOutput += output; - if (streamOutput) { - process.stderr.write(output); - } - if (readyWhenStatus.length && isReady(readyWhenStatus, err.toString())) { - res({ success: true, terminalOutput }); - } - }); - childProcess.on('error', (err) => { - const ouptput = addColorAndPrefix(err.toString(), commandConfig); - terminalOutput += ouptput; - if (streamOutput) { - process.stderr.write(ouptput); - } - res({ success: false, terminalOutput }); - }); - childProcess.on('exit', (code) => { - childProcesses.delete(childProcess); - if (!readyWhenStatus.length || isReady(readyWhenStatus)) { - res({ success: code === 0, terminalOutput }); - } - }); - }); -} - -function addColorAndPrefix( - out: string, - config: { - prefix?: string; - prefixColor?: string; - color?: string; - bgColor?: string; - } -) { - if (config.prefix) { - out = out - .split('\n') - .map((l) => { - let prefixText = config.prefix; - if (config.prefixColor && chalk[config.prefixColor]) { - prefixText = chalk[config.prefixColor](prefixText); - } - prefixText = chalk.bold(prefixText); - return l.trim().length > 0 ? `${prefixText} ${l}` : l; - }) - .join('\n'); - } - if (config.color && chalk[config.color]) { - out = chalk[config.color](out); - } - if (config.bgColor && chalk[config.bgColor]) { - out = chalk[config.bgColor](out); - } - return out; -} - -function calculateCwd( - cwd: string | undefined, - context: ExecutorContext -): string { - if (!cwd) return context.root; - if (path.isAbsolute(cwd)) return cwd; - return path.join(context.root, cwd); -} - -/** - * Env variables are processed in the following order: - * - env option from executor options - * - env file from envFile option if provided - * - local env variables - */ -function processEnv( - color: boolean, - cwd: string, - envOptionFromExecutor: Record, - envFile?: string -) { - let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() }); - localEnv = { - ...process.env, - ...localEnv, - }; - - if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) { - loadEnvVarsFile(envFile, localEnv); - } - let res: Record = { - ...localEnv, - ...envOptionFromExecutor, - }; - // need to override PATH to make sure we are using the local node_modules - if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like - if (localEnv.Path) res.Path = localEnv.Path; // Windows - - if (color) { - res.FORCE_COLOR = `${color}`; - } - return res; -} - export function interpolateArgsIntoCommand( command: string, opts: Pick< @@ -631,7 +324,10 @@ function filterPropKeysFromUnParsedOptions( let registered = false; -function registerProcessListener() { +function registerProcessListener( + runningTask: ParallelRunningTasks | SeriallyRunningTasks, + pseudoTerminal?: PseudoTerminal +) { if (registered) { return; } @@ -644,45 +340,25 @@ function registerProcessListener() { pseudoTerminal.sendMessageToChildren(message); } - childProcesses.forEach((p) => { - if ('connected' in p && p.connected) { - p.send(message); - } - }); + runningTask.send(message); }); // Terminate any task processes on exit process.on('exit', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill(); - } - }); + runningTask.kill(); }); process.on('SIGINT', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // we exit here because we don't need to write anything to cache. process.exit(signalToCode('SIGINT')); }); process.on('SIGTERM', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); process.on('SIGHUP', () => { - childProcesses.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } - }); + runningTask.kill('SIGTERM'); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); @@ -705,19 +381,3 @@ function wrapArgIntoQuotesIfNeeded(arg: string): string { return arg; } } - -function isReady( - readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], - data?: string -): boolean { - if (data) { - for (const readyWhenElement of readyWhenStatus) { - if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) { - readyWhenElement.found = true; - break; - } - } - } - - return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found); -} diff --git a/packages/nx/src/executors/run-commands/running-tasks.ts b/packages/nx/src/executors/run-commands/running-tasks.ts new file mode 100644 index 0000000000000..216d943867f89 --- /dev/null +++ b/packages/nx/src/executors/run-commands/running-tasks.ts @@ -0,0 +1,519 @@ +import { ChildProcess, exec, Serializable } from 'child_process'; +import { RunningTask } from '../../tasks-runner/running-tasks/running-task'; +import { ExecutorContext } from '../../config/misc-interfaces'; +import { + LARGE_BUFFER, + NormalizedRunCommandsOptions, + RunCommandsCommandOptions, + RunCommandsOptions, +} from './run-commands.impl'; +import { + PseudoTerminal, + PseudoTtyProcess, +} from '../../tasks-runner/pseudo-terminal'; +import { isAbsolute, join } from 'path'; +import * as chalk from 'chalk'; +import { env as appendLocalEnv } from 'npm-run-path'; +import { + loadAndExpandDotEnvFile, + unloadDotEnvFile, +} from '../../tasks-runner/task-env'; +import * as treeKill from 'tree-kill'; + +export class ParallelRunningTasks implements RunningTask { + private readonly childProcesses: RunningNodeProcess[]; + private readyWhenStatus: { stringToMatch: string; found: boolean }[]; + private readonly streamOutput: boolean; + + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + + constructor(options: NormalizedRunCommandsOptions, context: ExecutorContext) { + this.childProcesses = options.commands.map( + (commandConfig) => + new RunningNodeProcess( + commandConfig, + options.color, + calculateCwd(options.cwd, context), + options.env ?? {}, + options.readyWhenStatus, + options.streamOutput, + options.envFile + ) + ); + this.readyWhenStatus = options.readyWhenStatus; + this.streamOutput = options.streamOutput; + + this.run(); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + for (const childProcess of this.childProcesses) { + childProcess.send(message); + } + } + + async kill(signal?: NodeJS.Signals | number) { + await Promise.all( + this.childProcesses.map(async (p) => { + try { + return p.kill(); + } catch (e) { + console.error(`Unable to terminate "${p.command}"\nError:`, e); + } + }) + ); + } + + private async run() { + if (this.readyWhenStatus.length) { + let { + childProcess, + result: { code, terminalOutput }, + } = await Promise.race( + this.childProcesses.map( + (childProcess) => + new Promise<{ + childProcess: RunningNodeProcess; + result: { code: number; terminalOutput: string }; + }>((res) => { + childProcess.onExit((code, terminalOutput) => { + res({ + childProcess, + result: { code, terminalOutput }, + }); + }); + }) + ) + ); + + if (code !== 0) { + const output = `Warning: command "${childProcess.command}" exited with non-zero status code`; + terminalOutput += output; + if (this.streamOutput) { + process.stderr.write(output); + } + } + + for (const cb of this.exitCallbacks) { + cb(code, terminalOutput); + } + } else { + const results = await Promise.all( + this.childProcesses.map((childProcess) => + childProcess.getResults().then((result) => ({ + childProcess, + result, + })) + ) + ); + + let terminalOutput = results + .map((r) => r.result.terminalOutput) + .join('\r\n'); + + const failed = results.filter((result) => result.result.code !== 0); + if (failed.length > 0) { + const output = failed + .map( + (failedResult) => + `Warning: command "${failedResult.childProcess.command}" exited with non-zero status code` + ) + .join('\r\n'); + terminalOutput += output; + if (this.streamOutput) { + process.stderr.write(output); + } + + for (const cb of this.exitCallbacks) { + cb(1, terminalOutput); + } + } else { + for (const cb of this.exitCallbacks) { + cb(0, terminalOutput); + } + } + } + } +} + +export class SeriallyRunningTasks implements RunningTask { + private terminalOutput = ''; + private currentProcess: RunningTask | PseudoTtyProcess | null = null; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + private code: number | null = 0; + private error: any; + + constructor( + options: NormalizedRunCommandsOptions, + context: ExecutorContext, + private pseudoTerminal?: PseudoTerminal + ) { + this.run(options, context) + .catch((e) => { + this.error = e; + }) + .finally(() => { + for (const cb of this.exitCallbacks) { + cb(this.code, this.terminalOutput); + } + }); + } + + getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res, rej) => { + this.onExit((code) => { + if (this.error) { + rej(this.error); + } else { + res({ code, terminalOutput: this.terminalOutput }); + } + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + throw new Error('Not implemented'); + } + + kill(signal?: NodeJS.Signals | number) { + return this.currentProcess.kill(signal); + } + + private async run( + options: NormalizedRunCommandsOptions, + context: ExecutorContext + ) { + for (const c of options.commands) { + const childProcess = await this.createProcess( + c, + [], + options.color, + calculateCwd(options.cwd, context), + options.processEnv ?? options.env ?? {}, + false, + options.usePty, + options.streamOutput, + options.tty, + options.envFile + ); + this.currentProcess = childProcess; + + let { code, terminalOutput } = await childProcess.getResults(); + this.terminalOutput += terminalOutput; + this.code = code; + if (code !== 0) { + const output = `Warning: command "${c.command}" exited with non-zero status code`; + terminalOutput += output; + if (options.streamOutput) { + process.stderr.write(output); + } + this.terminalOutput += terminalOutput; + + // Stop running commands + break; + } + } + } + + private async createProcess( + commandConfig: RunCommandsCommandOptions, + readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], + color: boolean, + cwd: string, + env: Record, + isParallel: boolean, + usePty: boolean = true, + streamOutput: boolean = true, + tty: boolean, + envFile?: string + ): Promise { + // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes + // currently does not work properly in windows + if ( + this.pseudoTerminal && + process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && + !commandConfig.prefix && + readyWhenStatus.length === 0 && + !isParallel && + usePty + ) { + return createProcessWithPseudoTty( + this.pseudoTerminal, + commandConfig, + color, + cwd, + env, + streamOutput, + tty, + envFile + ); + } + + return new RunningNodeProcess( + commandConfig, + color, + cwd, + env, + readyWhenStatus, + streamOutput, + envFile + ); + } +} + +class RunningNodeProcess implements RunningTask { + private terminalOutput = ''; + private childProcess: ChildProcess; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + public command: string; + + constructor( + commandConfig: RunCommandsCommandOptions, + color: boolean, + cwd: string, + env: Record, + private readyWhenStatus: { stringToMatch: string; found: boolean }[], + streamOutput = true, + envFile: string + ) { + env = processEnv(color, cwd, env, envFile); + this.command = commandConfig.command; + this.terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; + if (streamOutput) { + process.stdout.write(this.terminalOutput); + } + this.childProcess = exec(commandConfig.command, { + maxBuffer: LARGE_BUFFER, + env, + cwd, + windowsHide: false, + }); + + this.addListeners(commandConfig, streamOutput); + } + + getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void): void { + this.exitCallbacks.push(cb); + } + + send(message: Serializable): void { + this.childProcess.send(message); + } + + kill(signal?: NodeJS.Signals | number): Promise { + return new Promise((res, rej) => { + treeKill(this.childProcess.pid, signal, (err) => { + if (err) { + rej(err); + } else { + res(); + } + }); + }); + } + + private addListeners( + commandConfig: RunCommandsCommandOptions, + streamOutput: boolean + ) { + this.childProcess.stdout.on('data', (data) => { + const output = addColorAndPrefix(data, commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stdout.write(output); + } + if ( + this.readyWhenStatus.length && + isReady(this.readyWhenStatus, data.toString()) + ) { + for (const cb of this.exitCallbacks) { + cb(0, this.terminalOutput); + } + } + }); + this.childProcess.stderr.on('data', (err) => { + const output = addColorAndPrefix(err, commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stderr.write(output); + } + if ( + this.readyWhenStatus.length && + isReady(this.readyWhenStatus, err.toString()) + ) { + for (const cb of this.exitCallbacks) { + cb(1, this.terminalOutput); + } + } + }); + this.childProcess.on('error', (err) => { + const output = addColorAndPrefix(err.toString(), commandConfig); + this.terminalOutput += output; + if (streamOutput) { + process.stderr.write(output); + } + for (const cb of this.exitCallbacks) { + cb(1, this.terminalOutput); + } + }); + this.childProcess.on('exit', (code) => { + if (!this.readyWhenStatus.length || isReady(this.readyWhenStatus)) { + for (const cb of this.exitCallbacks) { + cb(code, this.terminalOutput); + } + } + }); + } +} + +async function createProcessWithPseudoTty( + pseudoTerminal: PseudoTerminal, + commandConfig: { + command: string; + color?: string; + bgColor?: string; + prefix?: string; + }, + color: boolean, + cwd: string, + env: Record, + streamOutput: boolean = true, + tty: boolean, + envFile?: string +) { + let terminalOutput = chalk.dim('> ') + commandConfig.command + '\r\n\r\n'; + if (streamOutput) { + process.stdout.write(terminalOutput); + } + env = processEnv(color, cwd, env, envFile); + const childProcess = pseudoTerminal.runCommand(commandConfig.command, { + cwd, + jsEnv: env, + quiet: !streamOutput, + tty, + }); + + childProcess.onOutput((output) => { + terminalOutput += output; + }); + + return childProcess; +} + +function addColorAndPrefix(out: string, config: RunCommandsCommandOptions) { + if (config.prefix) { + out = out + .split('\n') + .map((l) => { + let prefixText = config.prefix; + if (config.prefixColor && chalk[config.prefixColor]) { + prefixText = chalk[config.prefixColor](prefixText); + } + prefixText = chalk.bold(prefixText); + return l.trim().length > 0 ? `${prefixText} ${l}` : l; + }) + .join('\n'); + } + if (config.color && chalk[config.color]) { + out = chalk[config.color](out); + } + if (config.bgColor && chalk[config.bgColor]) { + out = chalk[config.bgColor](out); + } + return out; +} + +function calculateCwd( + cwd: string | undefined, + context: ExecutorContext +): string { + if (!cwd) return context.root; + if (isAbsolute(cwd)) return cwd; + return join(context.root, cwd); +} + +/** + * Env variables are processed in the following order: + * - env option from executor options + * - env file from envFile option if provided + * - local env variables + */ +function processEnv( + color: boolean, + cwd: string, + envOptionFromExecutor: Record, + envFile?: string +) { + let localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() }); + localEnv = { + ...process.env, + ...localEnv, + }; + + if (process.env.NX_LOAD_DOT_ENV_FILES !== 'false' && envFile) { + loadEnvVarsFile(envFile, localEnv); + } + let res: Record = { + ...localEnv, + ...envOptionFromExecutor, + }; + // need to override PATH to make sure we are using the local node_modules + if (localEnv.PATH) res.PATH = localEnv.PATH; // UNIX-like + if (localEnv.Path) res.Path = localEnv.Path; // Windows + + if (color) { + res.FORCE_COLOR = `${color}`; + } + return res; +} + +function isReady( + readyWhenStatus: { stringToMatch: string; found: boolean }[] = [], + data?: string +): boolean { + if (data) { + for (const readyWhenElement of readyWhenStatus) { + if (data.toString().indexOf(readyWhenElement.stringToMatch) > -1) { + readyWhenElement.found = true; + break; + } + } + } + + return readyWhenStatus.every((readyWhenElement) => readyWhenElement.found); +} + +function loadEnvVarsFile(path: string, env: Record = {}) { + unloadDotEnvFile(path, env); + const result = loadAndExpandDotEnvFile(path, env); + if (result.error) { + throw result.error; + } +} diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index 59321395f80e7..8afdb0340d8be 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -11,6 +11,7 @@ export declare class ChildProcess { kill(): void onExit(callback: (message: string) => void): void onOutput(callback: (message: string) => void): void + cleanup(): void } export declare class FileLock { diff --git a/packages/nx/src/native/pseudo_terminal/child_process.rs b/packages/nx/src/native/pseudo_terminal/child_process.rs index be4deb401ec13..3b37f2c3fa65c 100644 --- a/packages/nx/src/native/pseudo_terminal/child_process.rs +++ b/packages/nx/src/native/pseudo_terminal/child_process.rs @@ -1,11 +1,13 @@ -use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use crossbeam_channel::{bounded, Receiver}; use napi::{ - threadsafe_function::{ - ErrorStrategy::Fatal, ThreadsafeFunction, ThreadsafeFunctionCallMode::NonBlocking, - }, - Env, JsFunction, + threadsafe_function::{ + ErrorStrategy::Fatal, ThreadsafeFunction, ThreadsafeFunctionCallMode::NonBlocking, + }, + Env, JsFunction, }; use portable_pty::ChildKiller; +use tracing::warn; pub enum ChildProcessMessage { Kill, @@ -16,6 +18,7 @@ pub struct ChildProcess { process_killer: Box, message_receiver: Receiver, pub(crate) wait_receiver: Receiver, + thread_handles: Vec>, } #[napi] impl ChildProcess { @@ -28,6 +31,7 @@ impl ChildProcess { process_killer, message_receiver, wait_receiver: exit_receiver, + thread_handles: vec![], } } @@ -68,18 +72,43 @@ impl ChildProcess { callback_tsfn.unref(&env)?; + let (kill_tx, kill_rx) = bounded::<()>(1); + std::thread::spawn(move || { - while let Ok(content) = rx.recv() { - // windows will add `ESC[6n` to the beginning of the output, - // we dont want to store this ANSI code in cache, because replays will cause issues - // remove it before sending it to js - #[cfg(windows)] - let content = content.replace("\x1B[6n", ""); - - callback_tsfn.call(content, NonBlocking); + loop { + if kill_rx.try_recv().is_ok() { + break; + } + + if let Ok(content) = rx.try_recv() { + // windows will add `ESC[6n` to the beginning of the output, + // we dont want to store this ANSI code in cache, because replays will cause issues + // remove it before sending it to js + #[cfg(windows)] + let content = content.replace("\x1B[6n", ""); + callback_tsfn.call(content, NonBlocking); + } } }); + self.thread_handles.push(kill_tx); + Ok(()) } + + #[napi] + pub fn cleanup(&mut self) { + let handles = std::mem::take(&mut self.thread_handles); + for handle in handles { + if let Err(e) = handle.send(()) { + warn!(error = ?e, "Failed to send kill signal to thread"); + } + } + } +} + +impl Drop for ChildProcess { + fn drop(&mut self) { + self.cleanup(); + } } diff --git a/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs b/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs index 001e7357c2d15..61cd28143f8bf 100644 --- a/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs +++ b/packages/nx/src/native/pseudo_terminal/pseudo_terminal.rs @@ -159,9 +159,9 @@ pub fn run_command( } let (exit_to_process_tx, exit_to_process_rx) = bounded(1); + trace!("Running {}", command); let mut child = pair.slave.spawn_command(cmd)?; pseudo_terminal.running.store(true, Ordering::SeqCst); - trace!("Running {}", command); let is_tty = tty.unwrap_or_else(|| std::io::stdout().is_tty()); if is_tty { trace!("Enabling raw mode"); diff --git a/packages/nx/src/tasks-runner/create-task-graph.spec.ts b/packages/nx/src/tasks-runner/create-task-graph.spec.ts index 4fa5768a29f9e..059f2aa960cd0 100644 --- a/packages/nx/src/tasks-runner/create-task-graph.spec.ts +++ b/packages/nx/src/tasks-runner/create-task-graph.spec.ts @@ -48,6 +48,7 @@ describe('createTaskGraph', () => { executor: 'nx:run-commands', }, serve: { + continuous: true, executor: 'nx:run-commands', }, }, @@ -90,6 +91,7 @@ describe('createTaskGraph', () => { roots: [], tasks: {}, dependencies: {}, + continuousDependencies: {}, }); }); @@ -117,11 +119,15 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); const twoTasks = createTaskGraph( @@ -148,6 +154,7 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:test': { id: 'lib1:test', @@ -159,12 +166,17 @@ describe('createTaskGraph', () => { overrides: { a: 123 }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'lib1:test': [], + }, }); }); @@ -299,6 +311,7 @@ describe('createTaskGraph', () => { overrides: {}, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -312,12 +325,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { 'lib1:compile:libDefault': ['lib2:compile'], 'lib2:compile': [], }, + continuousDependencies: { + 'lib1:compile:libDefault': [], + 'lib2:compile': [], + }, }); const compileApp = createTaskGraph( @@ -343,6 +361,7 @@ describe('createTaskGraph', () => { overrides: {}, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile:libDefault': { id: 'lib1:compile:libDefault', @@ -357,6 +376,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile:ci': { id: 'lib2:compile:ci', @@ -371,6 +391,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -378,6 +399,11 @@ describe('createTaskGraph', () => { 'lib1:compile:libDefault': ['lib2:compile:ci'], 'lib2:compile:ci': [], }, + continuousDependencies: { + 'app1:compile:ci': [], + 'lib1:compile:libDefault': [], + 'lib2:compile:ci': [], + }, }); }); @@ -460,6 +486,10 @@ describe('createTaskGraph', () => { 'app1:compile': ['lib3:compile'], 'lib3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'lib3:compile': [], + }, roots: ['lib3:compile'], tasks: { 'app1:compile': { @@ -472,6 +502,7 @@ describe('createTaskGraph', () => { target: 'compile', }, parallelism: true, + continuous: false, }, 'lib3:compile': { id: 'lib3:compile', @@ -485,6 +516,7 @@ describe('createTaskGraph', () => { target: 'compile', }, parallelism: true, + continuous: false, }, }, }); @@ -514,11 +546,15 @@ describe('createTaskGraph', () => { overrides: { a: '--value=app1-root' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); }); @@ -546,11 +582,15 @@ describe('createTaskGraph', () => { overrides: { a: '--base-href=/app1-root${deploymentId}' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:test': [], }, + continuousDependencies: { + 'app1:test': [], + }, }); }); @@ -661,6 +701,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -672,6 +713,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -683,6 +725,7 @@ describe('createTaskGraph', () => { overrides: { myFlag: 'flag value' }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -694,6 +737,7 @@ describe('createTaskGraph', () => { overrides: { __overrides_unparsed__: [] }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -702,6 +746,12 @@ describe('createTaskGraph', () => { 'lib1:compile': ['lib2:compile'], 'lib2:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'lib1:compile': [], + 'lib2:compile': [], + }, }); }); @@ -732,6 +782,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -745,6 +796,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile2': { id: 'app1:precompile2', @@ -758,6 +810,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -771,6 +824,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -779,6 +833,127 @@ describe('createTaskGraph', () => { 'app1:precompile2': [], 'lib1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'app1:precompile2': [], + 'lib1:compile': [], + }, + }); + }); + + it('should create graphs with continuous dependencies', () => { + projectGraph.nodes['app1'].data.targets['serve'].dependsOn = [ + { + dependencies: true, + target: 'serve', + }, + { + target: 'compile', + }, + ]; + projectGraph.nodes['app1'].data.targets['compile'].dependsOn = [ + { + dependencies: true, + target: 'compile', + }, + ]; + projectGraph.nodes['lib1'].data.targets['serve'] = { + executor: 'nx:run-command', + continuous: true, + dependsOn: [ + { + dependencies: true, + target: 'serve', + }, + { + target: 'compile', + }, + ], + }; + const taskGraph = createTaskGraph( + projectGraph, + {}, + ['app1'], + ['serve'], + undefined, + { + __overrides_unparsed__: [], + } + ); + // precompile should also be in here + expect(taskGraph).toEqual({ + roots: ['lib1:compile'], + tasks: { + 'app1:serve': { + id: 'app1:serve', + target: { + project: 'app1', + target: 'serve', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'app1-root', + parallelism: true, + continuous: true, + }, + 'app1:compile': { + id: 'app1:compile', + target: { + project: 'app1', + target: 'compile', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'app1-root', + parallelism: true, + continuous: false, + }, + 'lib1:serve': { + id: 'lib1:serve', + target: { + project: 'lib1', + target: 'serve', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'lib1-root', + parallelism: true, + continuous: true, + }, + 'lib1:compile': { + id: 'lib1:compile', + target: { + project: 'lib1', + target: 'compile', + }, + outputs: [], + overrides: { + __overrides_unparsed__: [], + }, + projectRoot: 'lib1-root', + parallelism: true, + continuous: false, + }, + }, + dependencies: { + 'app1:serve': ['app1:compile'], + 'app1:compile': ['lib1:compile'], + 'lib1:serve': ['lib1:compile'], + 'lib1:compile': [], + }, + continuousDependencies: { + 'app1:serve': ['lib1:serve'], + 'app1:compile': [], + 'lib1:serve': [], + 'lib1:compile': [], + }, }); }); @@ -809,6 +984,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile': { id: 'app1:precompile', @@ -822,6 +998,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:precompile2': { id: 'app1:precompile2', @@ -835,6 +1012,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -848,6 +1026,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -856,6 +1035,12 @@ describe('createTaskGraph', () => { 'app1:precompile2': [], 'lib1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:precompile': [], + 'app1:precompile2': [], + 'lib1:compile': [], + }, }); }); @@ -955,6 +1140,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib1:compile': { id: 'lib1:compile', @@ -968,6 +1154,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }, 'lib2:compile': { id: 'lib2:compile', @@ -981,6 +1168,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, 'lib3:compile': { id: 'lib3:compile', @@ -994,6 +1182,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib3-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -1002,6 +1191,12 @@ describe('createTaskGraph', () => { 'lib2:compile': ['lib3:compile'], 'lib3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'lib1:compile': [], + 'lib2:compile': [], + 'lib3:compile': [], + }, }); }); @@ -1118,6 +1313,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, 'app2:compile': { id: 'app2:compile', @@ -1126,6 +1322,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { __overrides_unparsed__: [] }, parallelism: true, + continuous: false, }, 'coreInfra:apply': { id: 'coreInfra:apply', @@ -1134,6 +1331,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, 'app1:compile': { id: 'app1:compile', @@ -1142,6 +1340,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { __overrides_unparsed__: [] }, parallelism: true, + continuous: false, }, 'infra2:apply': { id: 'infra2:apply', @@ -1150,6 +1349,7 @@ describe('createTaskGraph', () => { outputs: [], overrides: { myFlag: 'flag value' }, parallelism: true, + continuous: false, }, }, dependencies: { @@ -1164,6 +1364,13 @@ describe('createTaskGraph', () => { 'app1:compile': [], 'infra2:apply': ['app2:compile', 'coreInfra:apply'], }, + continuousDependencies: { + 'infra1:apply': [], + 'app2:compile': [], + 'coreInfra:apply': [], + 'app1:compile': [], + 'infra2:apply': [], + }, }); }); @@ -1217,6 +1424,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:test': { id: 'app1:test', @@ -1230,12 +1438,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app1:test'], 'app1:test': ['app1:compile'], }, + continuousDependencies: { + 'app1:compile': [], + 'app1:test': [], + }, }); }); @@ -1326,6 +1539,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1339,6 +1553,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib3:build': expect.objectContaining({ id: 'lib3:build', @@ -1352,6 +1567,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib3-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1365,6 +1581,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1373,6 +1590,12 @@ describe('createTaskGraph', () => { 'lib3:build': ['lib4:build'], 'lib4:build': ['lib1:build'], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib3:build': [], + 'lib4:build': [], + }, }); }); @@ -1458,6 +1681,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1471,6 +1695,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1484,6 +1709,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1491,6 +1717,11 @@ describe('createTaskGraph', () => { 'lib2:build': ['lib4:build'], 'lib4:build': [], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib4:build': [], + }, }); }); @@ -1551,11 +1782,15 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), }, dependencies: { 'lib1:build': [], }, + continuousDependencies: { + 'lib1:build': [], + }, }); }); @@ -1642,6 +1877,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1655,6 +1891,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), 'lib4:build': expect.objectContaining({ id: 'lib4:build', @@ -1668,6 +1905,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib4-root', parallelism: true, + continuous: false, }), }, dependencies: { @@ -1675,6 +1913,11 @@ describe('createTaskGraph', () => { 'lib2:build': [], 'lib4:build': ['lib1:build'], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + 'lib4:build': [], + }, }); }); @@ -1757,6 +2000,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib1-root', parallelism: true, + continuous: false, }), 'lib2:build': expect.objectContaining({ id: 'lib2:build', @@ -1770,12 +2014,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }), }, dependencies: { 'lib1:build': ['lib2:build'], 'lib2:build': [], }, + continuousDependencies: { + 'lib1:build': [], + 'lib2:build': [], + }, }); }); @@ -1852,6 +2101,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -1865,12 +2115,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], 'app3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app3:compile': [], + }, }); }); @@ -1944,6 +2199,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -1957,12 +2213,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], 'app3:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app3:compile': [], + }, }); }); @@ -2042,6 +2303,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app1:test': { id: 'app1:test', @@ -2055,6 +2317,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'lib2:dep': { id: 'lib2:dep', @@ -2068,6 +2331,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, 'lib2:dep2': { id: 'lib2:dep2', @@ -2081,6 +2345,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'lib2-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -2089,6 +2354,12 @@ describe('createTaskGraph', () => { 'lib2:dep': [], 'lib2:dep2': [], }, + continuousDependencies: { + 'app1:lint': [], + 'app1:test': [], + 'lib2:dep': [], + 'lib2:dep2': [], + }, }); }); @@ -2160,11 +2431,15 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + }, }); const taskGraph2 = createTaskGraph( @@ -2194,6 +2469,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app2:compile': { id: 'app2:compile', @@ -2207,12 +2483,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app2-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app2:compile'], 'app2:compile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app2:compile': [], + }, }); }); @@ -2706,6 +2987,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app1-root', parallelism: true, + continuous: false, }, 'app4:precompile': { id: 'app4:precompile', @@ -2719,12 +3001,17 @@ describe('createTaskGraph', () => { }, projectRoot: 'app4-root', parallelism: true, + continuous: false, }, }, dependencies: { 'app1:compile': ['app4:precompile'], 'app4:precompile': [], }, + continuousDependencies: { + 'app1:compile': [], + 'app4:precompile': [], + }, }); taskGraph = createTaskGraph( @@ -2752,6 +3039,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app2-root', parallelism: true, + continuous: false, }, 'app3:compile': { id: 'app3:compile', @@ -2765,6 +3053,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app3-root', parallelism: true, + continuous: false, }, 'app4:precompile': { id: 'app4:precompile', @@ -2778,6 +3067,7 @@ describe('createTaskGraph', () => { }, projectRoot: 'app4-root', parallelism: true, + continuous: false, }, }, dependencies: { @@ -2785,6 +3075,11 @@ describe('createTaskGraph', () => { 'app3:compile': ['app4:precompile'], 'app4:precompile': [], }, + continuousDependencies: { + 'app2:compile': [], + 'app3:compile': [], + 'app4:precompile': [], + }, }); }); diff --git a/packages/nx/src/tasks-runner/create-task-graph.ts b/packages/nx/src/tasks-runner/create-task-graph.ts index bb97f6e06eff1..30a8c2b335244 100644 --- a/packages/nx/src/tasks-runner/create-task-graph.ts +++ b/packages/nx/src/tasks-runner/create-task-graph.ts @@ -16,6 +16,7 @@ export class ProcessTasks { private readonly seen = new Set(); readonly tasks: { [id: string]: Task } = {}; readonly dependencies: { [k: string]: string[] } = {}; + readonly continuousDependencies: { [k: string]: string[] } = {}; private readonly allTargetNames: string[]; constructor( @@ -58,6 +59,7 @@ export class ProcessTasks { ); this.tasks[task.id] = task; this.dependencies[task.id] = []; + this.continuousDependencies[task.id] = []; } } } @@ -75,6 +77,7 @@ export class ProcessTasks { if (!initialTasks[t]) { delete this.tasks[t]; delete this.dependencies[t]; + delete this.continuousDependencies[t]; } } for (let d of Object.keys(this.dependencies)) { @@ -82,6 +85,11 @@ export class ProcessTasks { (dd) => !!initialTasks[dd] ); } + for (let d of Object.keys(this.continuousDependencies)) { + this.continuousDependencies[d] = this.continuousDependencies[d].filter( + (dd) => !!initialTasks[dd] + ); + } } filterDummyTasks(this.dependencies); @@ -96,8 +104,22 @@ export class ProcessTasks { } } - return Object.keys(this.dependencies).filter( - (d) => this.dependencies[d].length === 0 + filterDummyTasks(this.continuousDependencies); + + for (const taskId of Object.keys(this.continuousDependencies)) { + if (this.continuousDependencies[taskId].length > 0) { + this.continuousDependencies[taskId] = [ + ...new Set( + this.continuousDependencies[taskId].filter((d) => d !== taskId) + ).values(), + ]; + } + } + + return Object.keys(this.tasks).filter( + (d) => + this.dependencies[d].length === 0 && + this.continuousDependencies[d].length === 0 ); } @@ -204,9 +226,6 @@ export class ProcessTasks { dependencyConfig.target, resolvedConfiguration ); - if (task.id !== selfTaskId) { - this.dependencies[task.id].push(selfTaskId); - } if (!this.tasks[selfTaskId]) { const newTask = this.createTask( selfTaskId, @@ -217,6 +236,7 @@ export class ProcessTasks { ); this.tasks[selfTaskId] = newTask; this.dependencies[selfTaskId] = []; + this.continuousDependencies[selfTaskId] = []; this.processTask( newTask, newTask.target.project, @@ -224,6 +244,13 @@ export class ProcessTasks { overrides ); } + if (task.id !== selfTaskId) { + if (this.tasks[selfTaskId].continuous) { + this.continuousDependencies[task.id].push(selfTaskId); + } else { + this.dependencies[task.id].push(selfTaskId); + } + } } } @@ -265,8 +292,17 @@ export class ProcessTasks { resolvedConfiguration ); + const depTargetConfiguration = + this.projectGraph.nodes[depProject.name].data.targets[ + dependencyConfig.target + ]; + if (task.id !== depTargetId) { - this.dependencies[task.id].push(depTargetId); + if (depTargetConfiguration.continuous) { + this.continuousDependencies[task.id].push(depTargetId); + } else { + this.dependencies[task.id].push(depTargetId); + } } if (!this.tasks[depTargetId]) { const newTask = this.createTask( @@ -278,6 +314,7 @@ export class ProcessTasks { ); this.tasks[depTargetId] = newTask; this.dependencies[depTargetId] = []; + this.continuousDependencies[depTargetId] = []; this.processTask( newTask, @@ -298,6 +335,7 @@ export class ProcessTasks { ); this.dependencies[task.id].push(dummyId); this.dependencies[dummyId] ??= []; + this.continuousDependencies[dummyId] ??= []; const noopTask = this.createDummyTask(dummyId, task); this.processTask(noopTask, depProject.name, configuration, overrides); } @@ -354,6 +392,7 @@ export class ProcessTasks { ), cache: project.data.targets[target].cache, parallelism: project.data.targets[target].parallelism ?? true, + continuous: project.data.targets[target].continuous ?? false, }; } @@ -405,6 +444,7 @@ export function createTaskGraph( roots, tasks: p.tasks, dependencies: p.dependencies, + continuousDependencies: p.continuousDependencies, }; } diff --git a/packages/nx/src/tasks-runner/forked-process-task-runner.ts b/packages/nx/src/tasks-runner/forked-process-task-runner.ts index a58eab0d46b85..d719e01aee3f9 100644 --- a/packages/nx/src/tasks-runner/forked-process-task-runner.ts +++ b/packages/nx/src/tasks-runner/forked-process-task-runner.ts @@ -1,25 +1,25 @@ -import { readFileSync, writeFileSync } from 'fs'; -import { ChildProcess, fork, Serializable } from 'child_process'; -import * as chalk from 'chalk'; +import { writeFileSync } from 'fs'; +import { fork, Serializable } from 'child_process'; import { DefaultTasksRunnerOptions } from './default-tasks-runner'; import { output } from '../utils/output'; import { getCliPath, getPrintableCommandArgsForTask } from './utils'; import { Batch } from './tasks-schedule'; import { join } from 'path'; -import { - BatchMessage, - BatchMessageType, - BatchResults, -} from './batch/batch-messages'; +import { BatchMessageType } from './batch/batch-messages'; import { stripIndents } from '../utils/strip-indents'; import { Task, TaskGraph } from '../config/task-graph'; -import { Transform } from 'stream'; import { - PseudoTtyProcess, getPseudoTerminal, PseudoTerminal, + PseudoTtyProcess, } from './pseudo-terminal'; import { signalToCode } from '../utils/exit-codes'; +import { + NodeChildProcessWithDirectOutput, + NodeChildProcessWithNonDirectOutput, +} from './running-tasks/node-child-process'; +import { BatchProcess } from './running-tasks/batch-process'; +import { RunningTask } from './running-tasks/running-task'; const forkScript = join(__dirname, './fork.js'); @@ -29,7 +29,7 @@ export class ForkedProcessTaskRunner { cliPath = getCliPath(); private readonly verbose = process.env.NX_VERBOSE_LOGGING === 'true'; - private processes = new Set(); + private processes = new Set(); private pseudoTerminal: PseudoTerminal | null = PseudoTerminal.isSupported() ? getPseudoTerminal() @@ -45,81 +45,45 @@ export class ForkedProcessTaskRunner { } // TODO: vsavkin delegate terminal output printing - public forkProcessForBatch( + public async forkProcessForBatch( { executorName, taskGraph: batchTaskGraph }: Batch, fullTaskGraph: TaskGraph, env: NodeJS.ProcessEnv - ) { - return new Promise((res, rej) => { - try { - const count = Object.keys(batchTaskGraph.tasks).length; - if (count > 1) { - output.logSingleLine( - `Running ${output.bold(count)} ${output.bold( - 'tasks' - )} with ${output.bold(executorName)}` - ); - } else { - const args = getPrintableCommandArgsForTask( - Object.values(batchTaskGraph.tasks)[0] - ); - output.logCommand(args.join(' ')); - } + ): Promise { + const count = Object.keys(batchTaskGraph.tasks).length; + if (count > 1) { + output.logSingleLine( + `Running ${output.bold(count)} ${output.bold( + 'tasks' + )} with ${output.bold(executorName)}` + ); + } else { + const args = getPrintableCommandArgsForTask( + Object.values(batchTaskGraph.tasks)[0] + ); + output.logCommand(args.join(' ')); + } - const p = fork(workerPath, { - stdio: ['inherit', 'inherit', 'inherit', 'ipc'], - env, - }); - this.processes.add(p); - - p.once('exit', (code, signal) => { - this.processes.delete(p); - if (code === null) code = signalToCode(signal); - if (code !== 0) { - const results: BatchResults = {}; - for (const rootTaskId of batchTaskGraph.roots) { - results[rootTaskId] = { - success: false, - terminalOutput: '', - }; - } - rej( - new Error( - `"${executorName}" exited unexpectedly with code: ${code}` - ) - ); - } - }); + const p = fork(workerPath, { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }); + const cp = new BatchProcess(p, executorName); + this.processes.add(cp); - p.on('message', (message: BatchMessage) => { - switch (message.type) { - case BatchMessageType.CompleteBatchExecution: { - res(message.results); - break; - } - case BatchMessageType.RunTasks: { - break; - } - default: { - // Re-emit any non-batch messages from the task process - if (process.send) { - process.send(message); - } - } - } - }); + cp.onExit(() => { + this.processes.delete(cp); + }); - // Start the tasks - p.send({ - type: BatchMessageType.RunTasks, - executorName, - batchTaskGraph, - fullTaskGraph, - }); - } catch (e) { - rej(e); - } + // Start the tasks + cp.send({ + type: BatchMessageType.RunTasks, + executorName, + batchTaskGraph, + fullTaskGraph, }); + + return cp; } public async forkProcessLegacy( @@ -137,15 +101,15 @@ export class ForkedProcessTaskRunner { taskGraph: TaskGraph; env: NodeJS.ProcessEnv; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { return pipeOutput - ? await this.forkProcessPipeOutputCapture(task, { + ? this.forkProcessWithPrefixAndNotTTY(task, { temporaryOutputPath, streamOutput, taskGraph, env, }) - : await this.forkProcessDirectOutputCapture(task, { + : this.forkProcessDirectOutputCapture(task, { temporaryOutputPath, streamOutput, taskGraph, @@ -169,7 +133,7 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; disablePseudoTerminal: boolean; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { const shouldPrefix = streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; @@ -210,7 +174,7 @@ export class ForkedProcessTaskRunner { taskGraph: TaskGraph; env: NodeJS.ProcessEnv; } - ): Promise<{ code: number; terminalOutput: string }> { + ): Promise { const args = getPrintableCommandArgsForTask(task); if (streamOutput) { output.logCommand(args.join(' ')); @@ -237,41 +201,15 @@ export class ForkedProcessTaskRunner { terminalOutput += msg; }); - return new Promise((res) => { - p.onExit((code) => { - // If the exit code is greater than 128, it's a special exit code for a signal - if (code >= 128) { - process.exit(code); - } - this.writeTerminalOutput(temporaryOutputPath, terminalOutput); - res({ - code, - terminalOutput, - }); - }); + p.onExit((code) => { + if (code > 128) { + process.exit(code); + } + this.processes.delete(p); + this.writeTerminalOutput(temporaryOutputPath, terminalOutput); }); - } - private forkProcessPipeOutputCapture( - task: Task, - { - streamOutput, - temporaryOutputPath, - taskGraph, - env, - }: { - streamOutput: boolean; - temporaryOutputPath: string; - taskGraph: TaskGraph; - env: NodeJS.ProcessEnv; - } - ) { - return this.forkProcessWithPrefixAndNotTTY(task, { - streamOutput, - temporaryOutputPath, - taskGraph, - env, - }); + return p; } private forkProcessWithPrefixAndNotTTY( @@ -288,85 +226,49 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; } ) { - return new Promise<{ code: number; terminalOutput: string }>((res, rej) => { - try { - const args = getPrintableCommandArgsForTask(task); - if (streamOutput) { - output.logCommand(args.join(' ')); - } - - const p = fork(this.cliPath, { - stdio: ['inherit', 'pipe', 'pipe', 'ipc'], - env, - }); - this.processes.add(p); + try { + const args = getPrintableCommandArgsForTask(task); + if (streamOutput) { + output.logCommand(args.join(' ')); + } - // Re-emit any messages from the task process - p.on('message', (message) => { - if (process.send) { - process.send(message); - } - }); + const p = fork(this.cliPath, { + stdio: ['inherit', 'pipe', 'pipe', 'ipc'], + env, + }); - // Send message to run the executor - p.send({ - targetDescription: task.target, - overrides: task.overrides, - taskGraph, - isVerbose: this.verbose, - }); + // Send message to run the executor + p.send({ + targetDescription: task.target, + overrides: task.overrides, + taskGraph, + isVerbose: this.verbose, + }); - if (streamOutput) { - if (process.env.NX_PREFIX_OUTPUT === 'true') { - const color = getColor(task.target.project); - const prefixText = `${task.target.project}:`; - - p.stdout - .pipe( - logClearLineToPrefixTransformer(color.bold(prefixText) + ' ') - ) - .pipe(addPrefixTransformer(color.bold(prefixText))) - .pipe(process.stdout); - p.stderr - .pipe(logClearLineToPrefixTransformer(color(prefixText) + ' ')) - .pipe(addPrefixTransformer(color(prefixText))) - .pipe(process.stderr); - } else { - p.stdout.pipe(addPrefixTransformer()).pipe(process.stdout); - p.stderr.pipe(addPrefixTransformer()).pipe(process.stderr); - } - } + const cp = new NodeChildProcessWithNonDirectOutput(p, { + streamOutput, + prefix: task.target.project, + }); + this.processes.add(cp); - let outWithErr = []; - p.stdout.on('data', (chunk) => { - outWithErr.push(chunk.toString()); - }); - p.stderr.on('data', (chunk) => { - outWithErr.push(chunk.toString()); - }); + cp.onExit((code, terminalOutput) => { + this.processes.delete(cp); - p.on('exit', (code, signal) => { - this.processes.delete(p); - if (code === null) code = signalToCode(signal); - // we didn't print any output as we were running the command - // print all the collected output| - const terminalOutput = outWithErr.join(''); + if (!streamOutput) { + this.options.lifeCycle.printTaskTerminalOutput( + task, + code === 0 ? 'success' : 'failure', + terminalOutput + ); + } + this.writeTerminalOutput(temporaryOutputPath, terminalOutput); + }); - if (!streamOutput) { - this.options.lifeCycle.printTaskTerminalOutput( - task, - code === 0 ? 'success' : 'failure', - terminalOutput - ); - } - this.writeTerminalOutput(temporaryOutputPath, terminalOutput); - res({ code, terminalOutput }); - }); - } catch (e) { - console.error(e); - rej(e); - } - }); + return cp; + } catch (e) { + console.error(e); + throw e; + } } private forkProcessDirectOutputCapture( @@ -383,70 +285,56 @@ export class ForkedProcessTaskRunner { env: NodeJS.ProcessEnv; } ) { - return new Promise<{ code: number; terminalOutput: string }>((res, rej) => { - try { - const args = getPrintableCommandArgsForTask(task); - if (streamOutput) { - output.logCommand(args.join(' ')); - } - const p = fork(this.cliPath, { - stdio: ['inherit', 'inherit', 'inherit', 'ipc'], - env, - }); - this.processes.add(p); + try { + const args = getPrintableCommandArgsForTask(task); + if (streamOutput) { + output.logCommand(args.join(' ')); + } + const p = fork(this.cliPath, { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }); + const cp = new NodeChildProcessWithDirectOutput(p, temporaryOutputPath); - // Re-emit any messages from the task process - p.on('message', (message) => { - if (process.send) { - process.send(message); - } - }); + this.processes.add(cp); - // Send message to run the executor - p.send({ - targetDescription: task.target, - overrides: task.overrides, - taskGraph, - isVerbose: this.verbose, - }); + // Send message to run the executor + p.send({ + targetDescription: task.target, + overrides: task.overrides, + taskGraph, + isVerbose: this.verbose, + }); - p.on('exit', (code, signal) => { - if (code === null) code = signalToCode(signal); - // we didn't print any output as we were running the command - // print all the collected output - let terminalOutput = ''; - try { - terminalOutput = this.readTerminalOutput(temporaryOutputPath); - if (!streamOutput) { - this.options.lifeCycle.printTaskTerminalOutput( - task, - code === 0 ? 'success' : 'failure', - terminalOutput - ); - } - } catch (e) { - console.log(stripIndents` + cp.onExit((code, signal) => { + this.processes.delete(cp); + // we didn't print any output as we were running the command + // print all the collected output + try { + const terminalOutput = cp.getTerminalOutput(); + if (!streamOutput) { + this.options.lifeCycle.printTaskTerminalOutput( + task, + code === 0 ? 'success' : 'failure', + terminalOutput + ); + } + } catch (e) { + console.log(stripIndents` Unable to print terminal output for Task "${task.id}". Task failed with Exit Code ${code} and Signal "${signal}". Received error message: ${e.message} `); - } - res({ - code, - terminalOutput, - }); - }); - } catch (e) { - console.error(e); - rej(e); - } - }); - } + } + }); - private readTerminalOutput(outputPath: string) { - return readFileSync(outputPath).toString(); + return cp; + } catch (e) { + console.error(e); + throw e; + } } private writeTerminalOutput(outputPath: string, content: string) { @@ -462,13 +350,12 @@ export class ForkedProcessTaskRunner { // When the nx process gets a message, it will be sent into the task's process process.on('message', (message: Serializable) => { - // this.publisher.publish(message.toString()); if (this.pseudoTerminal) { this.pseudoTerminal.sendMessageToChildren(message); } this.processes.forEach((p) => { - if ('connected' in p && p.connected) { + if ('send' in p) { p.send(message); } }); @@ -477,94 +364,29 @@ export class ForkedProcessTaskRunner { // Terminate any task processes on exit process.on('exit', () => { this.processes.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill(); - } + p.kill(); }); }); process.on('SIGINT', () => { this.processes.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } + p.kill('SIGTERM'); }); // we exit here because we don't need to write anything to cache. process.exit(signalToCode('SIGINT')); }); process.on('SIGTERM', () => { this.processes.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } + p.kill('SIGTERM'); }); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); process.on('SIGHUP', () => { this.processes.forEach((p) => { - if ('connected' in p ? p.connected : p.isAlive) { - p.kill('SIGTERM'); - } + p.kill('SIGTERM'); }); // no exit here because we expect child processes to terminate which // will store results to the cache and will terminate this process }); } } - -const colors = [ - chalk.green, - chalk.greenBright, - chalk.red, - chalk.redBright, - chalk.cyan, - chalk.cyanBright, - chalk.yellow, - chalk.yellowBright, - chalk.magenta, - chalk.magentaBright, -]; - -function getColor(projectName: string) { - let code = 0; - for (let i = 0; i < projectName.length; ++i) { - code += projectName.charCodeAt(i); - } - const colorIndex = code % colors.length; - - return colors[colorIndex]; -} - -/** - * Prevents terminal escape sequence from clearing line prefix. - */ -function logClearLineToPrefixTransformer(prefix: string) { - let prevChunk = null; - return new Transform({ - transform(chunk, _encoding, callback) { - if (prevChunk && prevChunk.toString() === '\x1b[2K') { - chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix); - } - this.push(chunk); - prevChunk = chunk; - callback(); - }, - }); -} - -function addPrefixTransformer(prefix?: string) { - const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n'; - return new Transform({ - transform(chunk, _encoding, callback) { - const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); - list - .filter(Boolean) - .forEach((m) => - this.push( - prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator - ) - ); - callback(); - }, - }); -} diff --git a/packages/nx/src/tasks-runner/init-tasks-runner.ts b/packages/nx/src/tasks-runner/init-tasks-runner.ts index e582ee3fe0ad9..b614f01737155 100644 --- a/packages/nx/src/tasks-runner/init-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/init-tasks-runner.ts @@ -47,6 +47,10 @@ export async function initTasksRunner(nxArgs: NxArgs) { acc[task.id] = []; return acc; }, {} as any), + continuousDependencies: opts.tasks.reduce((acc, task) => { + acc[task.id] = []; + return acc; + }, {} as any), }; const taskResults = await invokeTasksRunner({ diff --git a/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts b/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts index 8b706a63c6a9d..269157b9090be 100644 --- a/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts +++ b/packages/nx/src/tasks-runner/life-cycles/formatting-utils.spec.ts @@ -59,6 +59,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -78,6 +79,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -99,6 +101,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -109,6 +112,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -131,6 +135,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -150,6 +155,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -171,6 +177,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -181,6 +188,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -200,6 +208,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -210,6 +219,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -229,6 +239,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -239,6 +250,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -262,6 +274,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -272,6 +285,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -295,6 +309,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -305,6 +320,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -315,6 +331,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] @@ -338,6 +355,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -348,6 +366,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -358,6 +377,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, { @@ -368,6 +388,7 @@ describe('formatTargetsAndProjects', () => { }, overrides: {}, parallelism: false, + continuous: false, outputs: [], }, ] diff --git a/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts b/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts index 7eefc84de5ab4..def3967731aaf 100644 --- a/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts +++ b/packages/nx/src/tasks-runner/pseudo-terminal.spec.ts @@ -17,6 +17,7 @@ describe('PseudoTerminal', () => { done(); }); }); + it('should kill a running command', (done) => { const childProcess = terminal.runCommand( 'sleep 3 && echo "hello world" > file.txt' @@ -31,18 +32,35 @@ describe('PseudoTerminal', () => { it('should subscribe to output', (done) => { const childProcess = terminal.runCommand('echo "hello world"'); - let output = ''; childProcess.onOutput((chunk) => { output += chunk; }); childProcess.onExit(() => { - expect(output.trim()).toContain('hello world'); - done(); + try { + expect(output.trim()).toContain('hello world'); + } finally { + done(); + } }); }); + it('should get results', async () => { + const childProcess = terminal.runCommand('echo "hello world"'); + + const results = await childProcess.getResults(); + + expect(results.code).toEqual(0); + expect(results.terminalOutput).toContain('hello world'); + const childProcess2 = terminal.runCommand('echo "hello jason"'); + + const results2 = await childProcess2.getResults(); + + expect(results2.code).toEqual(0); + expect(results2.terminalOutput).toContain('hello jason'); + }); + if (process.env.CI !== 'true') { it('should be tty', (done) => { const childProcess = terminal.runCommand( @@ -56,17 +74,12 @@ describe('PseudoTerminal', () => { } it('should run multiple commands', async () => { - function runCommand() { - return new Promise((res) => { - const cp1 = terminal.runCommand('whoami', {}); - - cp1.onExit(res); - }); - } - let i = 0; while (i < 10) { - await runCommand(); + const childProcess = terminal.runCommand('whoami', {}); + + await childProcess.getResults(); + i++; } }); diff --git a/packages/nx/src/tasks-runner/pseudo-terminal.ts b/packages/nx/src/tasks-runner/pseudo-terminal.ts index fc3a1bdee4b95..2e41d20157dfc 100644 --- a/packages/nx/src/tasks-runner/pseudo-terminal.ts +++ b/packages/nx/src/tasks-runner/pseudo-terminal.ts @@ -138,15 +138,32 @@ export class PseudoTerminal { export class PseudoTtyProcess { isAlive = true; - exitCallbacks = []; + private exitCallbacks: Array<(code: number) => void> = []; + private outputCallbacks: Array<(output: string) => void> = []; + + private terminalOutput = ''; constructor(private childProcess: ChildProcess) { + childProcess.onOutput((output) => { + this.terminalOutput += output; + this.outputCallbacks.forEach((cb) => cb(output)); + }); + childProcess.onExit((message) => { this.isAlive = false; - const exitCode = messageToCode(message); + const code = messageToCode(message); + childProcess.cleanup(); + + this.exitCallbacks.forEach((cb) => cb(code)); + }); + } - this.exitCallbacks.forEach((cb) => cb(exitCode)); + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code) => { + res({ code, terminalOutput: this.terminalOutput }); + }); }); } @@ -155,17 +172,17 @@ export class PseudoTtyProcess { } onOutput(callback: (message: string) => void): void { - this.childProcess.onOutput(callback); + this.outputCallbacks.push(callback); } kill(): void { - try { - this.childProcess.kill(); - } catch { - // when the child process completes before we explicitly call kill, this will throw - // do nothing - } finally { - if (this.isAlive == true) { + if (this.isAlive) { + try { + this.childProcess.kill(); + } catch { + // when the child process completes before we explicitly call kill, this will throw + // do nothing + } finally { this.isAlive = false; } } diff --git a/packages/nx/src/tasks-runner/running-tasks/batch-process.ts b/packages/nx/src/tasks-runner/running-tasks/batch-process.ts new file mode 100644 index 0000000000000..22899eb2665fa --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/batch-process.ts @@ -0,0 +1,84 @@ +import { + BatchMessage, + BatchMessageType, + BatchResults, +} from '../batch/batch-messages'; +import { ChildProcess, Serializable } from 'child_process'; +import { signalToCode } from '../../utils/exit-codes'; + +export class BatchProcess { + private exitCallbacks: Array<(code: number) => void> = []; + private resultsCallbacks: Array<(results: BatchResults) => void> = []; + + constructor( + private childProcess: ChildProcess, + private executorName: string + ) { + this.childProcess.on('message', (message: BatchMessage) => { + switch (message.type) { + case BatchMessageType.CompleteBatchExecution: { + for (const cb of this.resultsCallbacks) { + cb(message.results); + } + break; + } + case BatchMessageType.RunTasks: { + break; + } + default: { + // Re-emit any non-batch messages from the task process + if (process.send) { + process.send(message); + } + } + } + }); + + this.childProcess.once('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + + for (const cb of this.exitCallbacks) { + cb(code); + } + }); + } + + onExit(cb: (code: number) => void) { + this.exitCallbacks.push(cb); + } + + onResults(cb: (results: BatchResults) => void) { + this.resultsCallbacks.push(cb); + } + + async getResults(): Promise { + return Promise.race([ + new Promise((_, rej) => { + this.onExit((code) => { + if (code !== 0) { + rej( + new Error( + `"${this.executorName}" exited unexpectedly with code: ${code}` + ) + ); + } + }); + }), + new Promise((res) => { + this.onResults(res); + }), + ]); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + kill(signal?: NodeJS.Signals | number): void { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts b/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts new file mode 100644 index 0000000000000..1d6389524458b --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/node-child-process.ts @@ -0,0 +1,215 @@ +import { ChildProcess, Serializable } from 'child_process'; +import { signalToCode } from '../../utils/exit-codes'; +import { RunningTask } from './running-task'; +import { Transform } from 'stream'; +import * as chalk from 'chalk'; +import { readFileSync } from 'fs'; + +export class NodeChildProcessWithNonDirectOutput implements RunningTask { + private terminalOutput: string = ''; + private exitCallbacks: Array<(code: number, terminalOutput: string) => void> = + []; + + constructor( + private childProcess: ChildProcess, + { streamOutput, prefix }: { streamOutput: boolean; prefix: string } + ) { + if (streamOutput) { + if (process.env.NX_PREFIX_OUTPUT === 'true') { + const color = getColor(prefix); + const prefixText = `${prefix}:`; + + this.childProcess.stdout + .pipe(logClearLineToPrefixTransformer(color.bold(prefixText) + ' ')) + .pipe(addPrefixTransformer(color.bold(prefixText))) + .pipe(process.stdout); + this.childProcess.stderr + .pipe(logClearLineToPrefixTransformer(color(prefixText) + ' ')) + .pipe(addPrefixTransformer(color(prefixText))) + .pipe(process.stderr); + } else { + this.childProcess.stdout + .pipe(addPrefixTransformer()) + .pipe(process.stdout); + this.childProcess.stderr + .pipe(addPrefixTransformer()) + .pipe(process.stderr); + } + } + + this.childProcess.on('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + for (const cb of this.exitCallbacks) { + cb(code, this.terminalOutput); + } + }); + + // Re-emit any messages from the task process + this.childProcess.on('message', (message) => { + if (process.send) { + process.send(message); + } + }); + + this.childProcess.stdout.on('data', (chunk) => { + this.terminalOutput += chunk.toString(); + }); + this.childProcess.stderr.on('data', (chunk) => { + this.terminalOutput += chunk.toString(); + }); + } + + onExit(cb: (code: number, terminalOutput: string) => void) { + this.exitCallbacks.push(cb); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return new Promise((res) => { + this.onExit((code, terminalOutput) => { + res({ code, terminalOutput }); + }); + }); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + public kill(signal?: NodeJS.Signals | number) { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} + +function addPrefixTransformer(prefix?: string) { + const newLineSeparator = process.platform.startsWith('win') ? '\r\n' : '\n'; + return new Transform({ + transform(chunk, _encoding, callback) { + const list = chunk.toString().split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); + list + .filter(Boolean) + .forEach((m) => + this.push( + prefix ? prefix + ' ' + m + newLineSeparator : m + newLineSeparator + ) + ); + callback(); + }, + }); +} + +const colors = [ + chalk.green, + chalk.greenBright, + chalk.red, + chalk.redBright, + chalk.cyan, + chalk.cyanBright, + chalk.yellow, + chalk.yellowBright, + chalk.magenta, + chalk.magentaBright, +]; + +function getColor(projectName: string) { + let code = 0; + for (let i = 0; i < projectName.length; ++i) { + code += projectName.charCodeAt(i); + } + const colorIndex = code % colors.length; + + return colors[colorIndex]; +} + +/** + * Prevents terminal escape sequence from clearing line prefix. + */ +function logClearLineToPrefixTransformer(prefix: string) { + let prevChunk = null; + return new Transform({ + transform(chunk, _encoding, callback) { + if (prevChunk && prevChunk.toString() === '\x1b[2K') { + chunk = chunk.toString().replace(/\x1b\[1G/g, (m) => m + prefix); + } + this.push(chunk); + prevChunk = chunk; + callback(); + }, + }); +} + +export class NodeChildProcessWithDirectOutput implements RunningTask { + private terminalOutput: string | undefined; + private exitCallbacks: Array<(code: number, signal: string) => void> = []; + + private exited = false; + private exitCode: number; + + constructor( + private childProcess: ChildProcess, + private temporaryOutputPath: string + ) { + // Re-emit any messages from the task process + this.childProcess.on('message', (message) => { + if (process.send) { + process.send(message); + } + }); + + this.childProcess.on('exit', (code, signal) => { + if (code === null) code = signalToCode(signal); + + this.exited = true; + this.exitCode = code; + + for (const cb of this.exitCallbacks) { + cb(code, signal); + } + }); + } + + send(message: Serializable): void { + if (this.childProcess.connected) { + this.childProcess.send(message); + } + } + + onExit(cb: (code: number, signal: NodeJS.Signals) => void) { + this.exitCallbacks.push(cb); + } + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + const terminalOutput = this.getTerminalOutput(); + if (this.exited) { + return Promise.resolve({ + code: this.exitCode, + terminalOutput, + }); + } + await this.waitForExit(); + return Promise.resolve({ + code: this.exitCode, + terminalOutput, + }); + } + + waitForExit() { + return new Promise((res) => { + this.onExit(() => res()); + }); + } + + getTerminalOutput() { + this.terminalOutput ??= readFileSync(this.temporaryOutputPath).toString(); + return this.terminalOutput; + } + + kill(signal?: NodeJS.Signals | number): void { + if (this.childProcess.connected) { + this.childProcess.kill(signal); + } + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts b/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts new file mode 100644 index 0000000000000..173acc121de0e --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/noop-child-process.ts @@ -0,0 +1,20 @@ +import { Serializable } from 'child_process'; +import { RunningTask } from './running-task'; + +export class NoopChildProcess implements RunningTask { + constructor(private results: { code: number; terminalOutput: string }) {} + + send(): void {} + + async getResults(): Promise<{ code: number; terminalOutput: string }> { + return this.results; + } + + kill(): void { + return; + } + + onExit(cb: (code: number) => void): void { + cb(this.results.code); + } +} diff --git a/packages/nx/src/tasks-runner/running-tasks/running-task.ts b/packages/nx/src/tasks-runner/running-tasks/running-task.ts new file mode 100644 index 0000000000000..7355d16edbc03 --- /dev/null +++ b/packages/nx/src/tasks-runner/running-tasks/running-task.ts @@ -0,0 +1,7 @@ +export abstract class RunningTask { + abstract getResults(): Promise<{ code: number; terminalOutput: string }>; + + abstract onExit(cb: (code: number) => void): void; + + abstract kill(signal?: NodeJS.Signals | number): Promise | void; +} diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index 565b90ec3b7aa..e7997ee5a5b62 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -3,7 +3,7 @@ import { performance } from 'perf_hooks'; import { relative } from 'path'; import { writeFileSync } from 'fs'; import { TaskHasher } from '../hasher/task-hasher'; -import runCommandsImpl from '../executors/run-commands/run-commands.impl'; +import { runCommands } from '../executors/run-commands/run-commands.impl'; import { ForkedProcessTaskRunner } from './forked-process-task-runner'; import { Cache, DbCache, getCache } from './cache'; import { DefaultTasksRunnerOptions } from './default-tasks-runner'; @@ -33,6 +33,9 @@ import { output } from '../utils/output'; import { combineOptionsForExecutor } from '../utils/params'; import { NxJsonConfiguration } from '../config/nx-json'; import type { TaskDetails } from '../native'; +import { NoopChildProcess } from './running-tasks/noop-child-process'; +import { RunningTask } from './running-tasks/running-task'; +import { NxArgs } from '../utils/command-line-utils'; export class TaskOrchestrator { private taskDetails: TaskDetails | null = getTaskDetails(); @@ -64,6 +67,9 @@ export class TaskOrchestrator { private bailed = false; + private runningContinuousTasks = new Map(); + + private cleaningUp = false; // endregion internal state constructor( @@ -72,7 +78,7 @@ export class TaskOrchestrator { private readonly projectGraph: ProjectGraph, private readonly taskGraph: TaskGraph, private readonly nxJson: NxJsonConfiguration, - private readonly options: DefaultTasksRunnerOptions, + private readonly options: NxArgs & DefaultTasksRunnerOptions, private readonly bail: boolean, private readonly daemon: DaemonClient, private readonly outputStyle: string @@ -91,13 +97,17 @@ export class TaskOrchestrator { performance.mark('task-execution:start'); + const threadCount = + this.options.parallel + + Object.values(this.taskGraph.tasks).filter((t) => t.continuous).length; + const threads = []; - process.stdout.setMaxListeners(this.options.parallel + defaultMaxListeners); - process.stderr.setMaxListeners(this.options.parallel + defaultMaxListeners); + process.stdout.setMaxListeners(threadCount + defaultMaxListeners); + process.stderr.setMaxListeners(threadCount + defaultMaxListeners); // initial seeding of the queue - for (let i = 0; i < this.options.parallel; ++i) { + for (let i = 0; i < threadCount; ++i) { threads.push(this.executeNextBatchOfTasksUsingTaskSchedule()); } await Promise.all(threads); @@ -110,6 +120,8 @@ export class TaskOrchestrator { ); this.cache.removeOldCacheRecords(); + await this.cleanup(); + return this.completedTasks; } @@ -139,7 +151,11 @@ export class TaskOrchestrator { if (task) { const groupId = this.closeGroup(); - await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId); + if (task.continuous) { + await this.startContinuousTask(task, groupId); + } else { + await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId); + } this.openGroup(groupId); @@ -320,11 +336,13 @@ export class TaskOrchestrator { private async runBatch(batch: Batch, env: NodeJS.ProcessEnv) { try { - const results = await this.forkedProcessTaskRunner.forkProcessForBatch( - batch, - this.taskGraph, - env - ); + const batchProcess = + await this.forkedProcessTaskRunner.forkProcessForBatch( + batch, + this.taskGraph, + env + ); + const results = await batchProcess.getResults(); const batchResultEntries = Object.entries(results); return batchResultEntries.map(([taskId, result]) => ({ ...result, @@ -395,104 +413,116 @@ export class TaskOrchestrator { // the task wasn't cached if (results.length === 0) { - const shouldPrefix = - streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; - const targetConfiguration = getTargetConfigurationForTask( + const childProcess = await this.runTask( task, - this.projectGraph + streamOutput, + env, + temporaryOutputPath, + pipeOutput ); - if ( - process.env.NX_RUN_COMMANDS_DIRECTLY !== 'false' && - targetConfiguration.executor === 'nx:run-commands' && - !shouldPrefix - ) { - try { - const { schema } = getExecutorForTask(task, this.projectGraph); - const isRunOne = this.initiatingProject != null; - const combinedOptions = combineOptionsForExecutor( - task.overrides, - task.target.configuration ?? - targetConfiguration.defaultConfiguration, - targetConfiguration, - schema, - task.target.project, - relative(task.projectRoot ?? workspaceRoot, process.cwd()), - process.env.NX_VERBOSE_LOGGING === 'true' - ); - if (combinedOptions.env) { - env = { - ...env, - ...combinedOptions.env, - }; - } - if (streamOutput) { - const args = getPrintableCommandArgsForTask(task); - output.logCommand(args.join(' ')); - } - const { success, terminalOutput } = await runCommandsImpl( - { - ...combinedOptions, - env, - usePty: isRunOne && !this.tasksSchedule.hasTasks(), - streamOutput, - }, - { - root: workspaceRoot, // only root is needed in runCommandsImpl - } as any - ); - const status = success ? 'success' : 'failure'; + const { code, terminalOutput } = await childProcess.getResults(); + results.push({ + task, + status: code === 0 ? 'success' : 'failure', + terminalOutput, + }); + } + await this.postRunSteps([task], results, doNotSkipCache, { groupId }); + } + + private async runTask( + task: Task, + streamOutput: boolean, + env: { [p: string]: string | undefined; TZ?: string }, + temporaryOutputPath: string, + pipeOutput: boolean + ): Promise { + const shouldPrefix = + streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; + const targetConfiguration = getTargetConfigurationForTask( + task, + this.projectGraph + ); + if ( + process.env.NX_RUN_COMMANDS_DIRECTLY !== 'false' && + targetConfiguration.executor === 'nx:run-commands' && + !shouldPrefix + ) { + try { + const { schema } = getExecutorForTask(task, this.projectGraph); + const isRunOne = this.initiatingProject != null; + const combinedOptions = combineOptionsForExecutor( + task.overrides, + task.target.configuration ?? targetConfiguration.defaultConfiguration, + targetConfiguration, + schema, + task.target.project, + relative(task.projectRoot ?? workspaceRoot, process.cwd()), + process.env.NX_VERBOSE_LOGGING === 'true' + ); + if (combinedOptions.env) { + env = { + ...env, + ...combinedOptions.env, + }; + } + if (streamOutput) { + const args = getPrintableCommandArgsForTask(task); + output.logCommand(args.join(' ')); + } + const runningTask = await runCommands( + { + ...combinedOptions, + env, + usePty: + isRunOne && + !this.tasksSchedule.hasTasks() && + this.runningContinuousTasks.size === 0, + streamOutput, + }, + { + root: workspaceRoot, // only root is needed in runCommands + } as any + ); + + runningTask.onExit((code, terminalOutput) => { if (!streamOutput) { this.options.lifeCycle.printTaskTerminalOutput( task, - status, + code === 0 ? 'success' : 'failure', terminalOutput ); + writeFileSync(temporaryOutputPath, terminalOutput); } - writeFileSync(temporaryOutputPath, terminalOutput); - results.push({ - task, - status, - terminalOutput, - }); - } catch (e) { - if (process.env.NX_VERBOSE_LOGGING === 'true') { - console.error(e); - } else { - console.error(e.message); - } - const terminalOutput = e.stack ?? e.message ?? ''; - writeFileSync(temporaryOutputPath, terminalOutput); - results.push({ - task, - status: 'failure', - terminalOutput, - }); - } - } else if (targetConfiguration.executor === 'nx:noop') { - writeFileSync(temporaryOutputPath, ''); - results.push({ - task, - status: 'success', - terminalOutput: '', - }); - } else { - // cache prep - const { code, terminalOutput } = await this.runTaskInForkedProcess( - task, - env, - pipeOutput, - temporaryOutputPath, - streamOutput - ); - results.push({ - task, - status: code === 0 ? 'success' : 'failure', - terminalOutput, }); + + return runningTask; + } catch (e) { + if (process.env.NX_VERBOSE_LOGGING === 'true') { + console.error(e); + } else { + console.error(e.message); + } + const terminalOutput = e.stack ?? e.message ?? ''; + writeFileSync(temporaryOutputPath, terminalOutput); } + } else if (targetConfiguration.executor === 'nx:noop') { + writeFileSync(temporaryOutputPath, ''); + return new NoopChildProcess({ + code: 0, + terminalOutput: '', + }); + } else { + // cache prep + return await this.runTaskInForkedProcess( + task, + env, + pipeOutput, + temporaryOutputPath, + streamOutput + ); } - await this.postRunSteps([task], results, doNotSkipCache, { groupId }); } private async runTaskInForkedProcess( @@ -505,10 +535,10 @@ export class TaskOrchestrator { try { const usePtyFork = process.env.NX_NATIVE_COMMAND_RUNNER !== 'false'; - // Disable the pseudo terminal if this is a run-many - const disablePseudoTerminal = !this.initiatingProject; + // Disable the pseudo terminal if this is a run-many or when running a continuous task as part of a run-one + const disablePseudoTerminal = !this.initiatingProject || task.continuous; // execution - const { code, terminalOutput } = usePtyFork + const childProcess = usePtyFork ? await this.forkedProcessTaskRunner.forkProcess(task, { temporaryOutputPath, streamOutput, @@ -525,18 +555,84 @@ export class TaskOrchestrator { env, }); - return { - code, - terminalOutput, - }; + return childProcess; } catch (e) { if (process.env.NX_VERBOSE_LOGGING === 'true') { console.error(e); } - return { + return new NoopChildProcess({ code: 1, - }; + terminalOutput: undefined, + }); + } + } + + private async startContinuousTask(task: Task, groupId: number) { + const taskSpecificEnv = await this.processedTasks.get(task.id); + await this.preRunSteps([task], { groupId }); + + const pipeOutput = await this.pipeOutputCapture(task); + // obtain metadata + const temporaryOutputPath = this.cache.temporaryOutputPath(task); + const streamOutput = + this.outputStyle === 'static' + ? false + : shouldStreamOutput(task, this.initiatingProject); + + let env = pipeOutput + ? getEnvVariablesForTask( + task, + taskSpecificEnv, + process.env.FORCE_COLOR === undefined + ? 'true' + : process.env.FORCE_COLOR, + this.options.skipNxCache, + this.options.captureStderr, + null, + null + ) + : getEnvVariablesForTask( + task, + taskSpecificEnv, + undefined, + this.options.skipNxCache, + this.options.captureStderr, + temporaryOutputPath, + streamOutput + ); + const childProcess = await this.runTask( + task, + streamOutput, + env, + temporaryOutputPath, + pipeOutput + ); + this.runningContinuousTasks.set(task.id, childProcess); + + childProcess.onExit((code) => { + if (!this.cleaningUp) { + console.error( + `Task "${task.id}" is continuous but exited with code ${code}` + ); + this.cleanup().then(() => { + process.exit(1); + }); + } + }); + if ( + this.initiatingProject === task.target.project && + this.options.targets.length === 1 && + this.options.targets[0] === task.target.target + ) { + await childProcess.getResults(); + } else { + await this.tasksSchedule.scheduleNextTasks(); + // release blocked threads + this.waitingForTasks.forEach((f) => f(null)); + this.waitingForTasks.length = 0; } + + return childProcess; } // endregion Single Task @@ -725,4 +821,17 @@ export class TaskOrchestrator { } // endregion utils + + private async cleanup() { + this.cleaningUp = true; + await Promise.all( + Array.from(this.runningContinuousTasks).map(async ([taskId, t]) => { + try { + return t.kill(); + } catch (e) { + console.error(`Unable to terminate ${taskId}\nError:`, e); + } + }) + ); + } } diff --git a/packages/nx/src/tasks-runner/tasks-schedule.spec.ts b/packages/nx/src/tasks-runner/tasks-schedule.spec.ts index ed4fcd07ccd8b..b4f481ed5c567 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.spec.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.spec.ts @@ -18,6 +18,7 @@ function createMockTask(id: string, parallelism: boolean = true): Task { outputs: [], overrides: {}, parallelism, + continuous: false, }; } @@ -65,6 +66,11 @@ describe('TasksSchedule', () => { 'app2:build': [], 'lib1:build': [], }, + continuousDependencies: { + 'app1:build': [], + 'app2:build': [], + 'lib1:build': [], + }, roots: ['lib1:build', 'app2:build'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); @@ -274,6 +280,13 @@ describe('TasksSchedule', () => { 'app4:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'app2:test': [], + 'app3:test': [], + 'app4:test': [], + 'lib1:test': [], + }, roots: [ 'app1:test', 'app2:test', @@ -552,6 +565,11 @@ describe('TasksSchedule', () => { 'app2:build': [], 'lib1:build': [], }, + continuousDependencies: { + 'app1:build': [], + 'app2:build': [], + 'lib1:build': [], + }, roots: ['lib1:build', 'app2:build'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); @@ -719,6 +737,11 @@ describe('TasksSchedule', () => { 'app2:test': [], 'lib1:test': [], }, + continuousDependencies: { + 'app1:test': [], + 'app2:test': [], + 'lib1:test': [], + }, roots: ['app1:test', 'app2:test', 'lib1:test'], }; jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({}); diff --git a/packages/nx/src/tasks-runner/tasks-schedule.ts b/packages/nx/src/tasks-runner/tasks-schedule.ts index 3374c536314a1..d7df4e981a5ac 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.ts @@ -212,12 +212,15 @@ export class TasksSchedule { ({ tasks: {}, dependencies: {}, + continuousDependencies: {}, roots: [], } as TaskGraph)); batch.tasks[task.id] = task; batch.dependencies[task.id] = this.notScheduledTaskGraph.dependencies[task.id]; + batch.continuousDependencies[task.id] = + this.notScheduledTaskGraph.continuousDependencies[task.id]; if (isRoot) { batch.roots.push(task.id); } @@ -251,9 +254,13 @@ export class TasksSchedule { const hasDependenciesCompleted = this.taskGraph.dependencies[taskId].every( (id) => this.completedTasks.has(id) ); + const hasContinuousDependenciesStarted = + this.taskGraph.continuousDependencies[taskId].every((id) => + this.runningTasks.has(id) + ); // if dependencies have not completed, cannot schedule - if (!hasDependenciesCompleted) { + if (!hasDependenciesCompleted || !hasContinuousDependenciesStarted) { return false; } diff --git a/packages/nx/src/tasks-runner/utils.ts b/packages/nx/src/tasks-runner/utils.ts index 660c054b39184..fd74a06e052a1 100644 --- a/packages/nx/src/tasks-runner/utils.ts +++ b/packages/nx/src/tasks-runner/utils.ts @@ -451,18 +451,20 @@ export function removeTasksFromTaskGraph( graph: TaskGraph, ids: string[] ): TaskGraph { - const newGraph = removeIdsFromGraph(graph, ids, graph.tasks); + const newGraph = removeIdsFromTaskGraph(graph, ids, graph.tasks); return { dependencies: newGraph.dependencies, + continuousDependencies: newGraph.continuousDependencies, roots: newGraph.roots, tasks: newGraph.mapWithIds, }; } -export function removeIdsFromGraph( +function removeIdsFromTaskGraph( graph: { roots: string[]; dependencies: Record; + continuousDependencies: Record; }, ids: string[], mapWithIds: Record @@ -470,9 +472,11 @@ export function removeIdsFromGraph( mapWithIds: Record; roots: string[]; dependencies: Record; + continuousDependencies: Record; } { const filteredMapWithIds = {}; const dependencies = {}; + const continuousDependencies = {}; const removedSet = new Set(ids); for (let id of Object.keys(mapWithIds)) { if (!removedSet.has(id)) { @@ -480,13 +484,18 @@ export function removeIdsFromGraph( dependencies[id] = graph.dependencies[id].filter( (depId) => !removedSet.has(depId) ); + continuousDependencies[id] = graph.continuousDependencies[id].filter( + (depId) => !removedSet.has(depId) + ); } } return { mapWithIds: filteredMapWithIds, dependencies: dependencies, - roots: Object.keys(dependencies).filter( - (k) => dependencies[k].length === 0 + continuousDependencies, + roots: Object.keys(filteredMapWithIds).filter( + (k) => + dependencies[k].length === 0 && continuousDependencies[k].length === 0 ), }; } @@ -505,6 +514,12 @@ export function calculateReverseDeps( }); }); + Object.keys(taskGraph.continuousDependencies).forEach((taskId) => { + taskGraph.continuousDependencies[taskId].forEach((d) => { + reverseTaskDeps[d].push(taskId); + }); + }); + return reverseTaskDeps; }