Skip to content

Commit

Permalink
feat: Forward logs from task runner to logger (no-changelog) (#11422)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomi authored Oct 28, 2024
1 parent c56f30c commit d4c4db8
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 9 deletions.
1 change: 1 addition & 0 deletions packages/@n8n/config/src/configs/logging.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const LOG_SCOPES = [
'redis',
'scaling',
'waiting-executions',
'task-runner',
] as const;

export type LogScope = (typeof LOG_SCOPES)[number];
Expand Down
114 changes: 114 additions & 0 deletions packages/cli/src/runners/__tests__/forward-to-logger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import type { Logger } from 'n8n-workflow';
import { Readable } from 'stream';

import { forwardToLogger } from '../forward-to-logger';

describe('forwardToLogger', () => {
let logger: Logger;
let stdout: Readable;
let stderr: Readable;

beforeEach(() => {
logger = {
info: jest.fn(),
error: jest.fn(),
} as unknown as Logger;

stdout = new Readable({ read() {} });
stderr = new Readable({ read() {} });

jest.resetAllMocks();
});

const pushToStdout = async (data: string) => {
stdout.push(Buffer.from(data));
stdout.push(null);
// Wait for the next tick to allow the event loop to process the data
await new Promise((resolve) => setImmediate(resolve));
};

const pushToStderr = async (data: string) => {
stderr.push(Buffer.from(data));
stderr.push(null);
// Wait for the next tick to allow the event loop to process the data
await new Promise((resolve) => setImmediate(resolve));
};

it('should forward stdout data to logger.info', async () => {
forwardToLogger(logger, { stdout, stderr: null });

await pushToStdout('Test stdout message');

await new Promise((resolve) => setImmediate(resolve));

expect(logger.info).toHaveBeenCalledWith('Test stdout message');
});

it('should forward stderr data to logger.error', async () => {
forwardToLogger(logger, { stdout: null, stderr });

await pushToStderr('Test stderr message');

expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});

it('should remove trailing newline from stdout', async () => {
forwardToLogger(logger, { stdout, stderr: null });

await pushToStdout('Test stdout message\n');

expect(logger.info).toHaveBeenCalledWith('Test stdout message');
});

it('should remove trailing newline from stderr', async () => {
forwardToLogger(logger, { stdout: null, stderr });

await pushToStderr('Test stderr message\n');

expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});

it('should forward stderr data to logger.error', async () => {
forwardToLogger(logger, { stdout: null, stderr });

await pushToStderr('Test stderr message');

expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});

it('should include prefix if provided for stdout', async () => {
const prefix = '[PREFIX]';
forwardToLogger(logger, { stdout, stderr: null }, prefix);

await pushToStdout('Message with prefix');

expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix');
});

it('should include prefix if provided for stderr', async () => {
const prefix = '[PREFIX]';
forwardToLogger(logger, { stdout: null, stderr }, prefix);

await pushToStderr('Error message with prefix');

expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix');
});

it('should make sure there is no duplicate space after prefix for stdout', async () => {
const prefix = '[PREFIX] ';
forwardToLogger(logger, { stdout, stderr: null }, prefix);

await pushToStdout('Message with prefix');

expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix');
});

it('should make sure there is no duplicate space after prefix for stderr', async () => {
const prefix = '[PREFIX] ';
forwardToLogger(logger, { stdout: null, stderr }, prefix);

await pushToStderr('Error message with prefix');

expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix');
});
});
14 changes: 8 additions & 6 deletions packages/cli/src/runners/__tests__/task-runner-process.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import type { ChildProcess, SpawnOptions } from 'node:child_process';

import { mockInstance } from '../../../test/shared/mocking';
import type { TaskRunnerAuthService } from '../auth/task-runner-auth.service';
import { TaskRunnerProcess } from '../task-runner-process';
import { Logger } from '@/logging/logger.service';
import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.service';
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { mockInstance } from '@test/mocking';

const spawnMock = jest.fn(() =>
mock<ChildProcess>({
Expand All @@ -19,11 +20,12 @@ const spawnMock = jest.fn(() =>
require('child_process').spawn = spawnMock;

describe('TaskRunnerProcess', () => {
const logger = mockInstance(Logger);
const runnerConfig = mockInstance(TaskRunnersConfig);
runnerConfig.disabled = false;
runnerConfig.mode = 'internal_childprocess';
const authService = mock<TaskRunnerAuthService>();
let taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService);
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);

afterEach(async () => {
spawnMock.mockClear();
Expand All @@ -33,15 +35,15 @@ describe('TaskRunnerProcess', () => {
it('should throw if runner mode is external', () => {
runnerConfig.mode = 'external';

expect(() => new TaskRunnerProcess(runnerConfig, authService)).toThrow();
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();

runnerConfig.mode = 'internal_childprocess';
});
});

describe('start', () => {
beforeEach(() => {
taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService);
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
});

test.each(['PATH', 'NODE_FUNCTION_ALLOW_BUILTIN', 'NODE_FUNCTION_ALLOW_EXTERNAL'])(
Expand Down
42 changes: 42 additions & 0 deletions packages/cli/src/runners/forward-to-logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { Logger } from 'n8n-workflow';
import type { Readable } from 'stream';

/**
* Forwards stdout and stderr of a given producer to the given
* logger's info and error methods respectively.
*/
export function forwardToLogger(
logger: Logger,
producer: {
stdout?: Readable | null;
stderr?: Readable | null;
},
prefix?: string,
) {
if (prefix) {
prefix = prefix.trimEnd();
}

const stringify = (data: Buffer) => {
let str = data.toString();

// Remove possible trailing newline (otherwise it's duplicated)
if (str.endsWith('\n')) {
str = str.slice(0, -1);
}

return prefix ? `${prefix} ${str}` : str;
};

if (producer.stdout) {
producer.stdout.on('data', (data: Buffer) => {
logger.info(stringify(data));
});
}

if (producer.stderr) {
producer.stderr.on('data', (data: Buffer) => {
logger.error(stringify(data));
});
}
}
13 changes: 10 additions & 3 deletions packages/cli/src/runners/task-runner-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { spawn } from 'node:child_process';
import * as process from 'node:process';
import { Service } from 'typedi';

import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service';

import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { OnShutdown } from '../decorators/on-shutdown';
import { forwardToLogger } from './forward-to-logger';

type ChildProcess = ReturnType<typeof spawn>;

Expand Down Expand Up @@ -38,20 +41,25 @@ export class TaskRunnerProcess {

private isShuttingDown = false;

private logger: Logger;

private readonly passthroughEnvVars = [
'PATH',
'NODE_FUNCTION_ALLOW_BUILTIN',
'NODE_FUNCTION_ALLOW_EXTERNAL',
] as const;

constructor(
logger: Logger,
private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService,
) {
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
);

this.logger = logger.scoped('task-runner');
}

async start() {
Expand All @@ -64,8 +72,7 @@ export class TaskRunnerProcess {
? this.startLauncher(grantToken, n8nUri)
: this.startNode(grantToken, n8nUri);

this.process.stdout?.pipe(process.stdout);
this.process.stderr?.pipe(process.stderr);
forwardToLogger(this.logger, this.process, '[Task Runner]: ');

this.monitorProcess(this.process);
}
Expand Down

0 comments on commit d4c4db8

Please sign in to comment.