Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix worker death #668

Merged
merged 6 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,127 @@ test.serial("Don't send adaptor logs to stdout", (t) => {
});
});

// https://github.com/OpenFn/kit/pull/668
// This test relies on a capacity of 1
test.serial(
'keep claiming work after a run with an uncaught exception',
(t) => {
return new Promise(async (done) => {
const finished: Record<string, true> = {};

const onComplete = (evt) => {
const id = evt.runId;
finished[id] = true;

if (id === 'a20') {
t.is(evt.payload.reason, 'crash');
}
if (id === 'a21') {
t.is(evt.payload.reason, 'success');
}

if (finished.a20 && finished.a21) {
t.pass('both runs completed');
done();
}
};

lightning.on('run:complete', onComplete);

const body = `
fn(
() => new Promise(() => {
setTimeout(() => {
throw new Error('uncaught')
}, 1)
})
)
`;

lightning.enqueueRun({
id: 'a20',
jobs: [
{
id: 'j1',
adaptor: '@openfn/language-common@latest',
body,
},
],
});

lightning.enqueueRun({
id: 'a21',
jobs: [
{
id: 'j2',
adaptor: '@openfn/language-common@latest',
body: 'fn(() => ({ data: { answer: 42} }))',
},
],
});
});
}
);

// https://github.com/OpenFn/kit/pull/668
// This test relies on a capacity of 1
test.serial('keep claiming work after a run with a process.exit', (t) => {
return new Promise(async (done) => {
const finished: Record<string, true> = {};

const onComplete = (evt) => {
const id = evt.runId;
finished[id] = true;

if (id === 'a20') {
t.is(evt.payload.reason, 'crash');
}
if (id === 'a21') {
t.is(evt.payload.reason, 'success');
}

if (finished.a20 && finished.a21) {
t.pass('both runs completed');
done();
}
};

lightning.on('run:complete', onComplete);

const body = `
fn(
() => new Promise(() => {
setTimeout(() => {
process.exit()
}, 1)
})
)
`;

lightning.enqueueRun({
id: 'a20',
jobs: [
{
id: 'j1',
adaptor: '@openfn/language-common@latest',
body,
},
],
});

lightning.enqueueRun({
id: 'a21',
jobs: [
{
id: 'j2',
adaptor: '@openfn/language-common@latest',
body: 'fn(() => ({ data: { answer: 42} }))',
},
],
});
});
});

test.serial(
'stateful adaptor should create a new client for each attempt',
(t) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ export default function initWorkers(

const closeWorkers = async (instant?: boolean) => workers.destroy(instant);

return { callWorker, closeWorkers };
return { callWorker, closeWorkers, workers };
}
17 changes: 12 additions & 5 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const execute = async (context: ExecutionContext) => {
});
}

let didError = false;
const events = {
[workerEvents.WORKFLOW_START]: (evt: workerEvents.WorkflowStartEvent) => {
workflowStart(context, evt);
Expand All @@ -112,6 +113,7 @@ const execute = async (context: ExecutionContext) => {
},
// TODO this is also untested
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
didError = true;
error(context, { workflowId: state.plan.id, error: evt.error });
},
};
Expand All @@ -122,11 +124,16 @@ const execute = async (context: ExecutionContext) => {
events,
workerOptions
).catch((e: any) => {
// TODO are timeout errors being handled nicely here?
// actually I think the occur outside of here, in the pool

error(context, { workflowId: state.plan.id, error: e });
logger.error(`Critical error thrown by ${state.plan.id}`, e);
// An error should:
// a) emit an error event (and so be handled by the error() function
// b) reject the task in the pool
// This guard just ensures that error logging is not duplicated
// if both the above are true (as expected), but that there's still some
// fallback handling if the error event wasn't issued
if (!didError) {
error(context, { workflowId: state.plan.id, error: e });
logger.error(`Critical error thrown by ${state.plan.id}`, e);
}
});
} catch (e: any) {
if (!e.severity) {
Expand Down
1 change: 0 additions & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ export const error = (
event: internalEvents.ErrorEvent
) => {
const { threadId = '-', error } = event;

context.emit(externalEvents.WORKFLOW_ERROR, {
threadId,
// @ts-ignore
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ const createEngine = async (

const engine = new Engine() as EngineAPI;

const { callWorker, closeWorkers } = initWorkers(
const { callWorker, closeWorkers, workers } = initWorkers(
resolvedWorkerPath,
{
maxWorkers: options.maxWorkers,
Expand Down Expand Up @@ -239,6 +239,7 @@ const createEngine = async (
execute: executeWrapper,
listen,
destroy,
workers,
});
};

Expand Down
10 changes: 8 additions & 2 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,16 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
env: options.env || {},

// This pipes the stderr stream onto the child, so we can read it later
stdio: ['ipc', 'ignore', 'pipe'],
stdio: ['ipc', 'pipe', 'pipe'],
});

// Note: Ok, now I have visibility on the stdout stream
// I don't think I want to send this to gpc
// This might be strictly local debug
// child.stdout!.on('data', (data) => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incidental debugging code. I do want to leave it here but commented out for now.

// console.log(data.toString());
// });

logger.debug('pool: Created new child process', child.pid);
allWorkers[child.pid!] = child;
} else {
Expand Down Expand Up @@ -158,7 +165,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
} catch (e) {
// do nothing
}

reject(new ExitError(code));
finish(worker);
}
Expand Down
43 changes: 22 additions & 21 deletions packages/engine-multi/src/worker/thread/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// utilities to run inside the worker
// This is designed to minimize the amount of code we have to mock

import process from 'node:process';
import stringify from 'fast-safe-stringify';
import createLogger, { SanitizePolicies } from '@openfn/logger';
Expand Down Expand Up @@ -66,17 +65,23 @@ export const createLoggers = (
// Execute wrapper function
export const execute = async (
workflowId: string,
executeFn: () => Promise<any> | undefined
executeFn: () => Promise<any> | undefined,
publishFn = publish
) => {
const handleError = (err: any) => {
publish(workerEvents.ERROR, {
publishFn(workerEvents.ERROR, {
// @ts-ignore
workflowId,
// Map the error out of the thread in a serializable format
error: serializeError(err),
stack: err?.stack
stack: err?.stack,
// TODO job id maybe
});

// Explicitly send a reject task error, to ensure the worker closes down
publish(workerEvents.ENGINE_REJECT_TASK, {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix and this is what WASN'T happening before.

It's not enough to tell the worker we've errored. We need to send an event to the parent process to tell it that we're dead.

I'm a little concerned about duplicating error messages here and so there's a little bit of gnarly code on the reporting side to handle that.

error: serializeError(err),
});
};

process.on('exit', (code: number) => {
Expand All @@ -91,39 +96,35 @@ export const execute = async (
// it'll be ignored (ie the workerEmit call will fail)
process.on('uncaughtException', async (err: any) => {
// Log this error to local stdout. This won't be sent out of the worker thread.
console.debug(`Uncaught exception in worker thread (workflow ${workflowId} )`)
console.debug(err)

console.debug(
`Uncaught exception in worker thread (workflow ${workflowId} )`
);
console.debug(err);

// Also try and log to the workflow's logger
try {
console.error(`Uncaught exception in worker thread (workflow ${workflowId} )`)
console.error(err)
} catch(e) {
console.error(`Uncaught exception in worker thread`)
console.error(
`Uncaught exception in worker thread (workflow ${workflowId} )`
);
console.error(err);
} catch (e) {
console.error(`Uncaught exception in worker thread`);
}

// For now, we'll write this off as a crash-level generic execution error
// TODO did this come from job or adaptor code?
const e = new ExecutionError(err);
e.severity = 'crash'; // Downgrade this to a crash because it's likely not our fault
handleError(e);

// Close down the process just to be 100% sure that all async code stops
// This is in a timeout to give the emitted message time to escape
// There is a TINY WINDOW in which async code can still run and affect the next run
// This should all go away when we replace workerpool
setTimeout(() => {
process.exit(HANDLED_EXIT_CODE);
}, 2);
});

publish(workerEvents.WORKFLOW_START, {
publishFn(workerEvents.WORKFLOW_START, {
workflowId,
});

try {
const result = await executeFn();
publish(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });
publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });

// For tests
return result;
Expand Down
49 changes: 43 additions & 6 deletions packages/engine-multi/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import path from 'node:path';
import { createMockLogger } from '@openfn/logger';

import createEngine, { EngineOptions } from '../src/engine';
import { WORKFLOW_ERROR } from '../src/events';
import { WORKFLOW_ERROR, WORKFLOW_COMPLETE } from '../src/events';
import type { RuntimeEngine } from '../src/types';

let engine: RuntimeEngine;
Expand Down Expand Up @@ -138,10 +138,7 @@ test.serial.skip('vm oom error', (t) => {
});
});

// https://github.com/OpenFn/kit/issues/509
// TODO this passes standalone, but will trigger an exception in the next test
// This should start working again once we spin up the worker thread
test.serial.skip('execution error from async code', (t) => {
test.serial('execution error from async code', (t) => {
return new Promise((done) => {
const plan = {
id: 'e',
Expand All @@ -153,7 +150,7 @@ test.serial.skip('execution error from async code', (t) => {
// In which case it'll be ignored
// Also note that the wrapping promise will never resolve
expression: `export default [(s) => new Promise((r) => {
setTimeout(() => { throw new Error(\"e1324\"); r() }, 10)
setTimeout(() => { throw new Error(\"err\"); r() }, 10)
})]`,
},
],
Expand All @@ -170,6 +167,46 @@ test.serial.skip('execution error from async code', (t) => {
});
});

test.serial('after uncaught exception, free up the pool', (t) => {
const plan1 = {
id: 'e',
workflow: {
steps: [
{
expression: `export default [(s) => new Promise((r) => {
setTimeout(() => { throw new Error(\"err\"); r() }, 10)
})]`,
},
],
},
options: {},
};
const plan2 = {
id: 'a',
workflow: {
steps: [
{
expression: `export default [(s) => s]`,
},
],
},
options: {},
};

return new Promise((done) => {
engine.execute(plan1, {}).on(WORKFLOW_ERROR, (evt) => {
t.log('First workflow failed');
t.is(evt.type, 'ExecutionError');
t.is(evt.severity, 'crash');

engine.execute(plan2, {}).on(WORKFLOW_COMPLETE, () => {
t.log('Second workflow completed');
done();
});
});
});
});

test.serial('emit a crash error on process.exit()', (t) => {
return new Promise((done) => {
const plan = {
Expand Down
Loading