diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 354459683..834a16de2 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-worker +## 1.0.56 + +### Patch Changes + +- Updated dependencies [ca07db4] + - @openfn/ws-worker@1.6.1 + ## 1.0.55 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index b7155b990..63167ed0d 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.55", + "version": "1.0.56", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index 64f6b3a9c..72083290f 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -53,6 +53,31 @@ test('crash: syntax error', async (t) => { t.regex(error_message, /Unexpected token \(1:9\)$/); }); +// https://github.com/OpenFn/kit/issues/758 +test('crash: job not found', async (t) => { + lightning.addDataclip('x', {}); + + const attempt = { + id: crypto.randomUUID(), + jobs: [ + { + id: 'x', + adaptor: '@openfn/language-common@latest', + body: 'fn(s => s)', + }, + ], + dataclip_id: 'x', // having a data clip is important to trigger the crash + starting_node_id: 'y', + }; + + const result = await run(attempt); + + const { reason, error_type, error_message } = result; + t.is(reason, 'crash'); + t.is(error_type, 'ValidationError'); + t.regex(error_message, /could not find start job: y/i); +}); + test('exception: autoinstall error', async (t) => { const attempt = { id: crypto.randomUUID(), diff --git a/packages/lightning-mock/src/start.ts b/packages/lightning-mock/src/start.ts index 5b746a8c7..9f78a12b2 100644 --- a/packages/lightning-mock/src/start.ts +++ b/packages/lightning-mock/src/start.ts @@ -28,10 +28,14 @@ const args = yargs(hideBin(process.argv)) const logger = createLogger('LNG', { level: args.log }); -createLightningServer({ +const server = createLightningServer({ port: args.port, logger, logLevel: args.log, }); +// add a default credential +server.addCredential('c', { user: 'user ' }); +server.addDataclip('d', { data: {} }); + logger.success('Started mock Lightning server on ', args.port); diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 945d25ee2..b05ad2ed0 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,11 @@ # ws-worker +## 1.6.1 + +### Patch Changes + +- ca07db4: Fix an issue where a run with a missing start node caused the server to crash + ## 1.6.0 ### Minor Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index c18fa89bb..30483fbfe 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.6.0", + "version": "1.6.1", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index 3f2295c47..fa2743c64 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -30,8 +30,6 @@ const destroy = async (app: ServerApp, logger: Logger) => { await app.engine.destroy(); app.socket?.disconnect(); - logger.info('Server closed....'); - resolve(); }), ]); diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index ac3dc0ec1..52753bf16 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -149,7 +149,6 @@ export function execute( loadedInput = await loadDataclip(channel, input); logger.success('dataclip loaded'); } catch (e: any) { - // abort with error return handleRunError(context, { workflowId: plan.id!, message: `Failed to load dataclip ${input}${ diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 5d75feff3..6d52f389a 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -110,11 +110,13 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { if (app.killWorkloop) { app.killWorkloop(); delete app.killWorkloop; - logger.info('Connection to lightning lost.'); - logger.info( - 'Worker will automatically reconnect when lightning is back online.' - ); - // So far as I know, the socket will try and reconnect in the background forever + if (!app.destroyed) { + logger.info('Connection to lightning lost'); + logger.info( + 'Worker will automatically reconnect when lightning is back online' + ); + // So far as I know, the socket will try and reconnect in the background forever + } } }; @@ -180,6 +182,9 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { if (app.socket) { app.workflows[id] = true; + // TODO if we fail to join the run channel, the whole server + // will crash. Really we should just abort the run somehow + // Maybe even soft shutdown the worker const { channel: runChannel, plan, diff --git a/packages/ws-worker/src/util/create-run-state.ts b/packages/ws-worker/src/util/create-run-state.ts index 7227da833..460a28027 100644 --- a/packages/ws-worker/src/util/create-run-state.ts +++ b/packages/ws-worker/src/util/create-run-state.ts @@ -25,14 +25,16 @@ export default (plan: ExecutionPlan, input?: Lazy): RunState => { startNode = jobs.find(({ id }) => id === plan.options.start)!; } - // TODO throw with validation error of some kind if this node could not be found - const initialRuns: string[] = []; - // If this is a trigger, get the downstream jobs - if (!startNode.expression) { - initialRuns.push(...Object.keys(startNode.next!)); - } else { - initialRuns.push(startNode.id!); + // Note that the workflow hasn't been properly validated yet + // and it's technically possible that there is no start node + if (startNode) { + // If this is a trigger, get the downstream jobs + if (!startNode.expression) { + initialRuns.push(...Object.keys(startNode.next!)); + } else { + initialRuns.push(startNode.id!); + } } // For any runs downstream of the initial state, diff --git a/packages/ws-worker/test/util/create-run-state.test.ts b/packages/ws-worker/test/util/create-run-state.test.ts index 7424ee957..31ad59577 100644 --- a/packages/ws-worker/test/util/create-run-state.test.ts +++ b/packages/ws-worker/test/util/create-run-state.test.ts @@ -98,3 +98,24 @@ test('Set initial input dataclip with a trigger with multiple downstream jobs', t.deepEqual(run.inputDataclips, { a: 's', b: 's', c: 's' }); }); + +test("Do not throw if the start step doesn't exist", (t) => { + const plan = createPlan([{ id: 'a' }]); + plan.options.start = 'wibble'; + const input = 'x'; + + createRunState(plan, input); + + t.pass('did not throw'); +}); + +test('Do not throw if there are no steps', (t) => { + const plan = createPlan([{ id: 'a' }]); + plan.workflow.steps = []; + + const input = 'x'; + + createRunState(plan, input); + + t.pass('did not throw'); +});