diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index 0568eaf79167f..94e464222361a 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -11,6 +11,7 @@ export const LOG_SCOPES = [ 'redis', 'scaling', 'waiting-executions', + 'task-runner', ] as const; export type LogScope = (typeof LOG_SCOPES)[number]; diff --git a/packages/cli/src/runners/__tests__/forward-to-logger.test.ts b/packages/cli/src/runners/__tests__/forward-to-logger.test.ts new file mode 100644 index 0000000000000..64352ab54d62e --- /dev/null +++ b/packages/cli/src/runners/__tests__/forward-to-logger.test.ts @@ -0,0 +1,114 @@ +import type { Logger } from 'n8n-workflow'; +import { Readable } from 'stream'; + +import { forwardToLogger } from '../forward-to-logger'; + +describe('forwardToLogger', () => { + let logger: Logger; + let stdout: Readable; + let stderr: Readable; + + beforeEach(() => { + logger = { + info: jest.fn(), + error: jest.fn(), + } as unknown as Logger; + + stdout = new Readable({ read() {} }); + stderr = new Readable({ read() {} }); + + jest.resetAllMocks(); + }); + + const pushToStdout = async (data: string) => { + stdout.push(Buffer.from(data)); + stdout.push(null); + // Wait for the next tick to allow the event loop to process the data + await new Promise((resolve) => setImmediate(resolve)); + }; + + const pushToStderr = async (data: string) => { + stderr.push(Buffer.from(data)); + stderr.push(null); + // Wait for the next tick to allow the event loop to process the data + await new Promise((resolve) => setImmediate(resolve)); + }; + + it('should forward stdout data to logger.info', async () => { + forwardToLogger(logger, { stdout, stderr: null }); + + await pushToStdout('Test stdout message'); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(logger.info).toHaveBeenCalledWith('Test stdout message'); + }); + + it('should forward stderr data to logger.error', async () => { + forwardToLogger(logger, { stdout: null, stderr }); + + await pushToStderr('Test stderr message'); + + expect(logger.error).toHaveBeenCalledWith('Test stderr message'); + }); + + it('should remove trailing newline from stdout', async () => { + forwardToLogger(logger, { stdout, stderr: null }); + + await pushToStdout('Test stdout message\n'); + + expect(logger.info).toHaveBeenCalledWith('Test stdout message'); + }); + + it('should remove trailing newline from stderr', async () => { + forwardToLogger(logger, { stdout: null, stderr }); + + await pushToStderr('Test stderr message\n'); + + expect(logger.error).toHaveBeenCalledWith('Test stderr message'); + }); + + it('should forward stderr data to logger.error', async () => { + forwardToLogger(logger, { stdout: null, stderr }); + + await pushToStderr('Test stderr message'); + + expect(logger.error).toHaveBeenCalledWith('Test stderr message'); + }); + + it('should include prefix if provided for stdout', async () => { + const prefix = '[PREFIX]'; + forwardToLogger(logger, { stdout, stderr: null }, prefix); + + await pushToStdout('Message with prefix'); + + expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix'); + }); + + it('should include prefix if provided for stderr', async () => { + const prefix = '[PREFIX]'; + forwardToLogger(logger, { stdout: null, stderr }, prefix); + + await pushToStderr('Error message with prefix'); + + expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix'); + }); + + it('should make sure there is no duplicate space after prefix for stdout', async () => { + const prefix = '[PREFIX] '; + forwardToLogger(logger, { stdout, stderr: null }, prefix); + + await pushToStdout('Message with prefix'); + + expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix'); + }); + + it('should make sure there is no duplicate space after prefix for stderr', async () => { + const prefix = '[PREFIX] '; + forwardToLogger(logger, { stdout: null, stderr }, prefix); + + await pushToStderr('Error message with prefix'); + + expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix'); + }); +}); diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index 8940428789c6d..eb04e3ab8ed5a 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -2,9 +2,10 @@ import { TaskRunnersConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import type { ChildProcess, SpawnOptions } from 'node:child_process'; -import { mockInstance } from '../../../test/shared/mocking'; -import type { TaskRunnerAuthService } from '../auth/task-runner-auth.service'; -import { TaskRunnerProcess } from '../task-runner-process'; +import { Logger } from '@/logging/logger.service'; +import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.service'; +import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { mockInstance } from '@test/mocking'; const spawnMock = jest.fn(() => mock({ @@ -19,11 +20,12 @@ const spawnMock = jest.fn(() => require('child_process').spawn = spawnMock; describe('TaskRunnerProcess', () => { + const logger = mockInstance(Logger); const runnerConfig = mockInstance(TaskRunnersConfig); runnerConfig.disabled = false; runnerConfig.mode = 'internal_childprocess'; const authService = mock(); - let taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService); + let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); afterEach(async () => { spawnMock.mockClear(); @@ -33,7 +35,7 @@ describe('TaskRunnerProcess', () => { it('should throw if runner mode is external', () => { runnerConfig.mode = 'external'; - expect(() => new TaskRunnerProcess(runnerConfig, authService)).toThrow(); + expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow(); runnerConfig.mode = 'internal_childprocess'; }); @@ -41,7 +43,7 @@ describe('TaskRunnerProcess', () => { describe('start', () => { beforeEach(() => { - taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService); + taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); }); test.each(['PATH', 'NODE_FUNCTION_ALLOW_BUILTIN', 'NODE_FUNCTION_ALLOW_EXTERNAL'])( diff --git a/packages/cli/src/runners/forward-to-logger.ts b/packages/cli/src/runners/forward-to-logger.ts new file mode 100644 index 0000000000000..0bcc813225267 --- /dev/null +++ b/packages/cli/src/runners/forward-to-logger.ts @@ -0,0 +1,42 @@ +import type { Logger } from 'n8n-workflow'; +import type { Readable } from 'stream'; + +/** + * Forwards stdout and stderr of a given producer to the given + * logger's info and error methods respectively. + */ +export function forwardToLogger( + logger: Logger, + producer: { + stdout?: Readable | null; + stderr?: Readable | null; + }, + prefix?: string, +) { + if (prefix) { + prefix = prefix.trimEnd(); + } + + const stringify = (data: Buffer) => { + let str = data.toString(); + + // Remove possible trailing newline (otherwise it's duplicated) + if (str.endsWith('\n')) { + str = str.slice(0, -1); + } + + return prefix ? `${prefix} ${str}` : str; + }; + + if (producer.stdout) { + producer.stdout.on('data', (data: Buffer) => { + logger.info(stringify(data)); + }); + } + + if (producer.stderr) { + producer.stderr.on('data', (data: Buffer) => { + logger.error(stringify(data)); + }); + } +} diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index 0415b910b1056..f4c219ead7180 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -4,8 +4,11 @@ import { spawn } from 'node:child_process'; import * as process from 'node:process'; import { Service } from 'typedi'; +import { OnShutdown } from '@/decorators/on-shutdown'; +import { Logger } from '@/logging/logger.service'; + import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; -import { OnShutdown } from '../decorators/on-shutdown'; +import { forwardToLogger } from './forward-to-logger'; type ChildProcess = ReturnType; @@ -38,6 +41,8 @@ export class TaskRunnerProcess { private isShuttingDown = false; + private logger: Logger; + private readonly passthroughEnvVars = [ 'PATH', 'NODE_FUNCTION_ALLOW_BUILTIN', @@ -45,6 +50,7 @@ export class TaskRunnerProcess { ] as const; constructor( + logger: Logger, private readonly runnerConfig: TaskRunnersConfig, private readonly authService: TaskRunnerAuthService, ) { @@ -52,6 +58,8 @@ export class TaskRunnerProcess { this.runnerConfig.mode === 'internal_childprocess' || this.runnerConfig.mode === 'internal_launcher', ); + + this.logger = logger.scoped('task-runner'); } async start() { @@ -64,8 +72,7 @@ export class TaskRunnerProcess { ? this.startLauncher(grantToken, n8nUri) : this.startNode(grantToken, n8nUri); - this.process.stdout?.pipe(process.stdout); - this.process.stderr?.pipe(process.stderr); + forwardToLogger(this.logger, this.process, '[Task Runner]: '); this.monitorProcess(this.process); }