Skip to content

Commit

Permalink
Merge pull request #668 from OpenFn/fix-worker-death
Browse files Browse the repository at this point in the history
Fix worker death
  • Loading branch information
josephjclark authored Apr 17, 2024
2 parents 65e4031 + 781a75d commit 94cec66
Show file tree
Hide file tree
Showing 17 changed files with 285 additions and 76 deletions.
9 changes: 9 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/integration-tests-worker

## 1.0.40

### Patch Changes

- Updated dependencies
- @openfn/engine-multi@1.1.5
- @openfn/lightning-mock@2.0.5
- @openfn/ws-worker@1.1.5

## 1.0.39

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.39",
"version": "1.0.40",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
155 changes: 121 additions & 34 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,40 +403,127 @@ test.serial('an OOM error should still call step-complete', (t) => {
});
});

// test.serial('run a job with complex behaviours (initial state, branching)', (t) => {
// const attempt = {
// id: 'a1',
// initialState: 's1
// jobs: [
// {
// id: 'j1',
// body: 'const fn = (f) => (state) => f(state); fn(() => ({ data: { answer: 42} }))',
// },
// ],
// }

// initLightning();
// lightning.on('run:complete', (evt) => {
// // This will fetch the final dataclip from the attempt
// const result = lightning.getResult('a1');
// t.deepEqual(result, { data: { answer: 42 } });

// t.pass('completed attempt');
// done();
// });
// initWorker();

// lightning.enqueueRun({
// id: 'a1',
// jobs: [
// {
// id: 'j1',
// body: 'const fn = (f) => (state) => f(state); fn(() => ({ data: { answer: 42} }))',
// },
// ],
// });
// });
// });
// https://github.com/OpenFn/kit/pull/668
// This test relies on a capacity of 1
test.serial(
'keep claiming work after a run with an uncaught exception',
(t) => {
return new Promise(async (done) => {
const finished: Record<string, true> = {};

const onComplete = (evt) => {
const id = evt.runId;
finished[id] = true;

if (id === 'a20') {
t.is(evt.payload.reason, 'crash');
}
if (id === 'a21') {
t.is(evt.payload.reason, 'success');
}

if (finished.a20 && finished.a21) {
t.pass('both runs completed');
done();
}
};

lightning.on('run:complete', onComplete);

const body = `
fn(
() => new Promise(() => {
setTimeout(() => {
throw new Error('uncaught')
}, 1)
})
)
`;

lightning.enqueueRun({
id: 'a20',
jobs: [
{
id: 'j1',
adaptor: '@openfn/language-common@latest',
body,
},
],
});

lightning.enqueueRun({
id: 'a21',
jobs: [
{
id: 'j2',
adaptor: '@openfn/language-common@latest',
body: 'fn(() => ({ data: { answer: 42} }))',
},
],
});
});
}
);

// https://github.com/OpenFn/kit/pull/668
// This test relies on a capacity of 1
test.serial('keep claiming work after a run with a process.exit', (t) => {
return new Promise(async (done) => {
const finished: Record<string, true> = {};

const onComplete = (evt) => {
const id = evt.runId;
finished[id] = true;

if (id === 'a30') {
t.is(evt.payload.reason, 'crash');
}
if (id === 'a31') {
t.is(evt.payload.reason, 'success');
}

if (finished.a30 && finished.a31) {
t.pass('both runs completed');
done();
}
};

lightning.on('run:complete', onComplete);

const body = `
fn(
() => new Promise(() => {
setTimeout(() => {
process.exit()
}, 1)
})
)
`;

lightning.enqueueRun({
id: 'a30',
jobs: [
{
id: 'j1',
adaptor: '@openfn/language-common@latest',
body,
},
],
});

lightning.enqueueRun({
id: 'a31',
jobs: [
{
id: 'j2',
adaptor: '@openfn/language-common@latest',
body: 'fn(() => ({ data: { answer: 42} }))',
},
],
});
});
});

test.serial("Don't send job logs to stdout", (t) => {
return new Promise(async (done) => {
const attempt = {
Expand Down
6 changes: 6 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# engine-multi

## 1.1.5

### Patch Changes

- Fix an issue where failed steps might not error correctly, stopping the pool from reclaiming the slot

## 1.1.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.1.4",
"version": "1.1.5",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ export default function initWorkers(

const closeWorkers = async (instant?: boolean) => workers.destroy(instant);

return { callWorker, closeWorkers };
return { callWorker, closeWorkers, workers };
}
17 changes: 12 additions & 5 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const execute = async (context: ExecutionContext) => {
});
}

let didError = false;
const events = {
[workerEvents.WORKFLOW_START]: (evt: workerEvents.WorkflowStartEvent) => {
workflowStart(context, evt);
Expand All @@ -112,6 +113,7 @@ const execute = async (context: ExecutionContext) => {
},
// TODO this is also untested
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
didError = true;
error(context, { workflowId: state.plan.id, error: evt.error });
},
};
Expand All @@ -122,11 +124,16 @@ const execute = async (context: ExecutionContext) => {
events,
workerOptions
).catch((e: any) => {
// TODO are timeout errors being handled nicely here?
// actually I think the occur outside of here, in the pool

error(context, { workflowId: state.plan.id, error: e });
logger.error(`Critical error thrown by ${state.plan.id}`, e);
// An error should:
// a) emit an error event (and so be handled by the error() function
// b) reject the task in the pool
// This guard just ensures that error logging is not duplicated
// if both the above are true (as expected), but that there's still some
// fallback handling if the error event wasn't issued
if (!didError) {
error(context, { workflowId: state.plan.id, error: e });
logger.error(`Critical error thrown by ${state.plan.id}`, e);
}
});
} catch (e: any) {
if (!e.severity) {
Expand Down
1 change: 0 additions & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ export const error = (
event: internalEvents.ErrorEvent
) => {
const { threadId = '-', error } = event;

context.emit(externalEvents.WORKFLOW_ERROR, {
threadId,
// @ts-ignore
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ const createEngine = async (

const engine = new Engine() as EngineAPI;

const { callWorker, closeWorkers } = initWorkers(
const { callWorker, closeWorkers, workers } = initWorkers(
resolvedWorkerPath,
{
maxWorkers: options.maxWorkers,
Expand Down Expand Up @@ -239,6 +239,7 @@ const createEngine = async (
execute: executeWrapper,
listen,
destroy,
workers,
});
};

Expand Down
10 changes: 8 additions & 2 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,16 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
env: options.env || {},

// This pipes the stderr stream onto the child, so we can read it later
stdio: ['ipc', 'ignore', 'pipe'],
stdio: ['ipc', 'pipe', 'pipe'],
});

// Note: Ok, now I have visibility on the stdout stream
// I don't think I want to send this to gpc
// This might be strictly local debug
// child.stdout!.on('data', (data) => {
// console.log(data.toString());
// });

logger.debug('pool: Created new child process', child.pid);
allWorkers[child.pid!] = child;
} else {
Expand Down Expand Up @@ -158,7 +165,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
} catch (e) {
// do nothing
}

reject(new ExitError(code));
finish(worker);
}
Expand Down
43 changes: 22 additions & 21 deletions packages/engine-multi/src/worker/thread/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// utilities to run inside the worker
// This is designed to minimize the amount of code we have to mock

import process from 'node:process';
import stringify from 'fast-safe-stringify';
import createLogger, { SanitizePolicies } from '@openfn/logger';
Expand Down Expand Up @@ -66,17 +65,23 @@ export const createLoggers = (
// Execute wrapper function
export const execute = async (
workflowId: string,
executeFn: () => Promise<any> | undefined
executeFn: () => Promise<any> | undefined,
publishFn = publish
) => {
const handleError = (err: any) => {
publish(workerEvents.ERROR, {
publishFn(workerEvents.ERROR, {
// @ts-ignore
workflowId,
// Map the error out of the thread in a serializable format
error: serializeError(err),
stack: err?.stack
stack: err?.stack,
// TODO job id maybe
});

// Explicitly send a reject task error, to ensure the worker closes down
publish(workerEvents.ENGINE_REJECT_TASK, {
error: serializeError(err),
});
};

process.on('exit', (code: number) => {
Expand All @@ -91,39 +96,35 @@ export const execute = async (
// it'll be ignored (ie the workerEmit call will fail)
process.on('uncaughtException', async (err: any) => {
// Log this error to local stdout. This won't be sent out of the worker thread.
console.debug(`Uncaught exception in worker thread (workflow ${workflowId} )`)
console.debug(err)

console.debug(
`Uncaught exception in worker thread (workflow ${workflowId} )`
);
console.debug(err);

// Also try and log to the workflow's logger
try {
console.error(`Uncaught exception in worker thread (workflow ${workflowId} )`)
console.error(err)
} catch(e) {
console.error(`Uncaught exception in worker thread`)
console.error(
`Uncaught exception in worker thread (workflow ${workflowId} )`
);
console.error(err);
} catch (e) {
console.error(`Uncaught exception in worker thread`);
}

// For now, we'll write this off as a crash-level generic execution error
// TODO did this come from job or adaptor code?
const e = new ExecutionError(err);
e.severity = 'crash'; // Downgrade this to a crash because it's likely not our fault
handleError(e);

// Close down the process just to be 100% sure that all async code stops
// This is in a timeout to give the emitted message time to escape
// There is a TINY WINDOW in which async code can still run and affect the next run
// This should all go away when we replace workerpool
setTimeout(() => {
process.exit(HANDLED_EXIT_CODE);
}, 2);
});

publish(workerEvents.WORKFLOW_START, {
publishFn(workerEvents.WORKFLOW_START, {
workflowId,
});

try {
const result = await executeFn();
publish(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });
publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });

// For tests
return result;
Expand Down
Loading

0 comments on commit 94cec66

Please sign in to comment.