diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index e306fbc72..17a115a94 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/integration-tests-worker +## 1.0.40 + +### Patch Changes + +- Updated dependencies + - @openfn/engine-multi@1.1.5 + - @openfn/lightning-mock@2.0.5 + - @openfn/ws-worker@1.1.5 + ## 1.0.39 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 23829c1e5..bb65549e5 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.39", + "version": "1.0.40", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index 3781a3bdf..b55f23614 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -403,40 +403,127 @@ test.serial('an OOM error should still call step-complete', (t) => { }); }); -// test.serial('run a job with complex behaviours (initial state, branching)', (t) => { -// const attempt = { -// id: 'a1', -// initialState: 's1 -// jobs: [ -// { -// id: 'j1', -// body: 'const fn = (f) => (state) => f(state); fn(() => ({ data: { answer: 42} }))', -// }, -// ], -// } - -// initLightning(); -// lightning.on('run:complete', (evt) => { -// // This will fetch the final dataclip from the attempt -// const result = lightning.getResult('a1'); -// t.deepEqual(result, { data: { answer: 42 } }); - -// t.pass('completed attempt'); -// done(); -// }); -// initWorker(); - -// lightning.enqueueRun({ -// id: 'a1', -// jobs: [ -// { -// id: 'j1', -// body: 'const fn = (f) => (state) => f(state); fn(() => ({ data: { answer: 42} }))', -// }, -// ], -// }); -// }); -// }); +// https://github.com/OpenFn/kit/pull/668 +// This test relies on a capacity of 1 +test.serial( + 'keep claiming work after a run with an uncaught exception', + (t) => { + return new Promise(async (done) => { + const finished: Record = {}; + + const onComplete = (evt) => { + const id = evt.runId; + finished[id] = true; + + if (id === 'a20') { + t.is(evt.payload.reason, 'crash'); + } + if (id === 'a21') { + t.is(evt.payload.reason, 'success'); + } + + if (finished.a20 && finished.a21) { + t.pass('both runs completed'); + done(); + } + }; + + lightning.on('run:complete', onComplete); + + const body = ` +fn( + () => new Promise(() => { + setTimeout(() => { + throw new Error('uncaught') + }, 1) + }) +) +`; + + lightning.enqueueRun({ + id: 'a20', + jobs: [ + { + id: 'j1', + adaptor: '@openfn/language-common@latest', + body, + }, + ], + }); + + lightning.enqueueRun({ + id: 'a21', + jobs: [ + { + id: 'j2', + adaptor: '@openfn/language-common@latest', + body: 'fn(() => ({ data: { answer: 42} }))', + }, + ], + }); + }); + } +); + +// https://github.com/OpenFn/kit/pull/668 +// This test relies on a capacity of 1 +test.serial('keep claiming work after a run with a process.exit', (t) => { + return new Promise(async (done) => { + const finished: Record = {}; + + const onComplete = (evt) => { + const id = evt.runId; + finished[id] = true; + + if (id === 'a30') { + t.is(evt.payload.reason, 'crash'); + } + if (id === 'a31') { + t.is(evt.payload.reason, 'success'); + } + + if (finished.a30 && finished.a31) { + t.pass('both runs completed'); + done(); + } + }; + + lightning.on('run:complete', onComplete); + + const body = ` +fn( + () => new Promise(() => { + setTimeout(() => { + process.exit() + }, 1) + }) +) +`; + + lightning.enqueueRun({ + id: 'a30', + jobs: [ + { + id: 'j1', + adaptor: '@openfn/language-common@latest', + body, + }, + ], + }); + + lightning.enqueueRun({ + id: 'a31', + jobs: [ + { + id: 'j2', + adaptor: '@openfn/language-common@latest', + body: 'fn(() => ({ data: { answer: 42} }))', + }, + ], + }); + }); +}); + test.serial("Don't send job logs to stdout", (t) => { return new Promise(async (done) => { const attempt = { diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 2702919c0..d0dc5114d 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,11 @@ # engine-multi +## 1.1.5 + +### Patch Changes + +- Fix an issue where failed steps might not error correctly, stopping the pool from reclaiming the slot + ## 1.1.4 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 8eeebf2b6..d9f15ceb2 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.1.4", + "version": "1.1.5", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index 9306b7101..4215091c9 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -51,5 +51,5 @@ export default function initWorkers( const closeWorkers = async (instant?: boolean) => workers.destroy(instant); - return { callWorker, closeWorkers }; + return { callWorker, closeWorkers, workers }; } diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index f86c996ba..dfc713b24 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -88,6 +88,7 @@ const execute = async (context: ExecutionContext) => { }); } + let didError = false; const events = { [workerEvents.WORKFLOW_START]: (evt: workerEvents.WorkflowStartEvent) => { workflowStart(context, evt); @@ -112,6 +113,7 @@ const execute = async (context: ExecutionContext) => { }, // TODO this is also untested [workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => { + didError = true; error(context, { workflowId: state.plan.id, error: evt.error }); }, }; @@ -122,11 +124,16 @@ const execute = async (context: ExecutionContext) => { events, workerOptions ).catch((e: any) => { - // TODO are timeout errors being handled nicely here? - // actually I think the occur outside of here, in the pool - - error(context, { workflowId: state.plan.id, error: e }); - logger.error(`Critical error thrown by ${state.plan.id}`, e); + // An error should: + // a) emit an error event (and so be handled by the error() function + // b) reject the task in the pool + // This guard just ensures that error logging is not duplicated + // if both the above are true (as expected), but that there's still some + // fallback handling if the error event wasn't issued + if (!didError) { + error(context, { workflowId: state.plan.id, error: e }); + logger.error(`Critical error thrown by ${state.plan.id}`, e); + } }); } catch (e: any) { if (!e.severity) { diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index 7b66ce6ae..d6b52fd6f 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -148,7 +148,6 @@ export const error = ( event: internalEvents.ErrorEvent ) => { const { threadId = '-', error } = event; - context.emit(externalEvents.WORKFLOW_ERROR, { threadId, // @ts-ignore diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index ad868a40e..7a3c4596e 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -121,7 +121,7 @@ const createEngine = async ( const engine = new Engine() as EngineAPI; - const { callWorker, closeWorkers } = initWorkers( + const { callWorker, closeWorkers, workers } = initWorkers( resolvedWorkerPath, { maxWorkers: options.maxWorkers, @@ -239,6 +239,7 @@ const createEngine = async ( execute: executeWrapper, listen, destroy, + workers, }); }; diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 5e94f05b7..630a4bd42 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -87,9 +87,16 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { env: options.env || {}, // This pipes the stderr stream onto the child, so we can read it later - stdio: ['ipc', 'ignore', 'pipe'], + stdio: ['ipc', 'pipe', 'pipe'], }); + // Note: Ok, now I have visibility on the stdout stream + // I don't think I want to send this to gpc + // This might be strictly local debug + // child.stdout!.on('data', (data) => { + // console.log(data.toString()); + // }); + logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; } else { @@ -158,7 +165,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { } catch (e) { // do nothing } - reject(new ExitError(code)); finish(worker); } diff --git a/packages/engine-multi/src/worker/thread/helpers.ts b/packages/engine-multi/src/worker/thread/helpers.ts index 06c924a44..492b2674d 100644 --- a/packages/engine-multi/src/worker/thread/helpers.ts +++ b/packages/engine-multi/src/worker/thread/helpers.ts @@ -1,6 +1,5 @@ // utilities to run inside the worker // This is designed to minimize the amount of code we have to mock - import process from 'node:process'; import stringify from 'fast-safe-stringify'; import createLogger, { SanitizePolicies } from '@openfn/logger'; @@ -66,17 +65,23 @@ export const createLoggers = ( // Execute wrapper function export const execute = async ( workflowId: string, - executeFn: () => Promise | undefined + executeFn: () => Promise | undefined, + publishFn = publish ) => { const handleError = (err: any) => { - publish(workerEvents.ERROR, { + publishFn(workerEvents.ERROR, { // @ts-ignore workflowId, // Map the error out of the thread in a serializable format error: serializeError(err), - stack: err?.stack + stack: err?.stack, // TODO job id maybe }); + + // Explicitly send a reject task error, to ensure the worker closes down + publish(workerEvents.ENGINE_REJECT_TASK, { + error: serializeError(err), + }); }; process.on('exit', (code: number) => { @@ -91,15 +96,19 @@ export const execute = async ( // it'll be ignored (ie the workerEmit call will fail) process.on('uncaughtException', async (err: any) => { // Log this error to local stdout. This won't be sent out of the worker thread. - console.debug(`Uncaught exception in worker thread (workflow ${workflowId} )`) - console.debug(err) - + console.debug( + `Uncaught exception in worker thread (workflow ${workflowId} )` + ); + console.debug(err); + // Also try and log to the workflow's logger try { - console.error(`Uncaught exception in worker thread (workflow ${workflowId} )`) - console.error(err) - } catch(e) { - console.error(`Uncaught exception in worker thread`) + console.error( + `Uncaught exception in worker thread (workflow ${workflowId} )` + ); + console.error(err); + } catch (e) { + console.error(`Uncaught exception in worker thread`); } // For now, we'll write this off as a crash-level generic execution error @@ -107,23 +116,15 @@ export const execute = async ( const e = new ExecutionError(err); e.severity = 'crash'; // Downgrade this to a crash because it's likely not our fault handleError(e); - - // Close down the process just to be 100% sure that all async code stops - // This is in a timeout to give the emitted message time to escape - // There is a TINY WINDOW in which async code can still run and affect the next run - // This should all go away when we replace workerpool - setTimeout(() => { - process.exit(HANDLED_EXIT_CODE); - }, 2); }); - publish(workerEvents.WORKFLOW_START, { + publishFn(workerEvents.WORKFLOW_START, { workflowId, }); try { const result = await executeFn(); - publish(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result }); + publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result }); // For tests return result; diff --git a/packages/engine-multi/test/errors.test.ts b/packages/engine-multi/test/errors.test.ts index 26849678d..8ba54cd20 100644 --- a/packages/engine-multi/test/errors.test.ts +++ b/packages/engine-multi/test/errors.test.ts @@ -3,7 +3,7 @@ import path from 'node:path'; import { createMockLogger } from '@openfn/logger'; import createEngine, { EngineOptions } from '../src/engine'; -import { WORKFLOW_ERROR } from '../src/events'; +import { WORKFLOW_ERROR, WORKFLOW_COMPLETE } from '../src/events'; import type { RuntimeEngine } from '../src/types'; let engine: RuntimeEngine; @@ -138,10 +138,7 @@ test.serial.skip('vm oom error', (t) => { }); }); -// https://github.com/OpenFn/kit/issues/509 -// TODO this passes standalone, but will trigger an exception in the next test -// This should start working again once we spin up the worker thread -test.serial.skip('execution error from async code', (t) => { +test.serial('execution error from async code', (t) => { return new Promise((done) => { const plan = { id: 'e', @@ -153,7 +150,7 @@ test.serial.skip('execution error from async code', (t) => { // In which case it'll be ignored // Also note that the wrapping promise will never resolve expression: `export default [(s) => new Promise((r) => { - setTimeout(() => { throw new Error(\"e1324\"); r() }, 10) + setTimeout(() => { throw new Error(\"err\"); r() }, 10) })]`, }, ], @@ -170,6 +167,46 @@ test.serial.skip('execution error from async code', (t) => { }); }); +test.serial('after uncaught exception, free up the pool', (t) => { + const plan1 = { + id: 'e', + workflow: { + steps: [ + { + expression: `export default [(s) => new Promise((r) => { + setTimeout(() => { throw new Error(\"err\"); r() }, 10) + })]`, + }, + ], + }, + options: {}, + }; + const plan2 = { + id: 'a', + workflow: { + steps: [ + { + expression: `export default [(s) => s]`, + }, + ], + }, + options: {}, + }; + + return new Promise((done) => { + engine.execute(plan1, {}).on(WORKFLOW_ERROR, (evt) => { + t.log('First workflow failed'); + t.is(evt.type, 'ExecutionError'); + t.is(evt.severity, 'crash'); + + engine.execute(plan2, {}).on(WORKFLOW_COMPLETE, () => { + t.log('Second workflow completed'); + done(); + }); + }); + }); +}); + test.serial('emit a crash error on process.exit()', (t) => { return new Promise((done) => { const plan = { diff --git a/packages/engine-multi/test/worker/helper.test.ts b/packages/engine-multi/test/worker/helper.test.ts index 445fac3ed..69ff74502 100644 --- a/packages/engine-multi/test/worker/helper.test.ts +++ b/packages/engine-multi/test/worker/helper.test.ts @@ -1,6 +1,7 @@ import test from 'ava'; +import * as workerEvents from '../../src/worker/events'; -import { createLoggers } from '../../src/worker/thread/helpers'; +import { execute, createLoggers } from '../../src/worker/thread/helpers'; test('createLogger: runtime logger should emit an event on log', (t) => { const message = 'testing1234'; @@ -40,3 +41,44 @@ test('createLogger: runtime logger should emit a nicely serialised error on log' logger.log(message); }); + +test('execute: should call the run function', (t) => { + let didCallRun = false; + + const run = async () => { + // do something + didCallRun = true; + }; + + execute('abc', run); + + t.true(didCallRun); +}); + +test('execute: should publish workflow-start', async (t) => { + let event; + + const publish = (eventName: string, payload: any) => { + if (eventName === workerEvents.WORKFLOW_START) { + event = payload; + } + }; + + await execute('abc', async () => {}, publish); + + t.deepEqual(event, { workflowId: 'abc' }); +}); + +test('execute: should publish workflow-complete', async (t) => { + let event; + + const publish = (eventName: string, payload: any) => { + if (eventName === workerEvents.WORKFLOW_COMPLETE) { + event = payload; + } + }; + + await execute('abc', async () => ({}), publish); + + t.deepEqual(event, { workflowId: 'abc', state: {} }); +}); diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index 605ea2805..80a4e9f25 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/lightning-mock +## 2.0.5 + +### Patch Changes + +- Updated dependencies + - @openfn/engine-multi@1.1.5 + ## 2.0.4 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 99079beba..8bdc5ab3d 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.4", + "version": "2.0.5", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 10f2a900f..8c4003924 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,12 @@ # ws-worker +## 1.1.5 + +### Patch Changes + +- Updated dependencies + - @openfn/engine-multi@1.1.5 + ## 1.1.4 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index f9efd0fd5..908bd5ec5 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.1.4", + "version": "1.1.5", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",