diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index d165860c7..46f113e69 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-worker +## 1.0.59 + +### Patch Changes + +- Updated dependencies [0cf7198] + - @openfn/ws-worker@1.6.4 + ## 1.0.58 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 5db029a02..e4893869e 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.58", + "version": "1.0.59", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/lightning-mock/src/api-sockets.ts b/packages/lightning-mock/src/api-sockets.ts index 6a4725c7d..2ece130af 100644 --- a/packages/lightning-mock/src/api-sockets.ts +++ b/packages/lightning-mock/src/api-sockets.ts @@ -392,7 +392,7 @@ const createSocketAPI = ( evt: PhoenixEvent ) { const { ref, join_ref, topic } = evt; - const { step_id, job_id, input_dataclip_id } = evt.payload; + const { step_id, job_id } = evt.payload; const [_, runId] = topic.split(':'); if (!state.dataclips) { @@ -414,11 +414,6 @@ const createSocketAPI = ( status: 'error', response: 'no job_id', }; - } else if (!input_dataclip_id) { - payload = { - status: 'error', - response: 'no input_dataclip_id', - }; } ws.reply({ diff --git a/packages/lightning-mock/test/events/step-start.test.ts b/packages/lightning-mock/test/events/step-start.test.ts index 3f1924905..9ed7f88f0 100644 --- a/packages/lightning-mock/test/events/step-start.test.ts +++ b/packages/lightning-mock/test/events/step-start.test.ts @@ -95,7 +95,7 @@ test.serial('error if no job_id', async (t) => { }); }); -test.serial('error if no input_dataclip_id', async (t) => { +test.serial('no error if no input_dataclip_id', async (t) => { return new Promise(async (done) => { const run = createRun(); @@ -103,15 +103,15 @@ test.serial('error if no input_dataclip_id', async (t) => { const event = { job_id: 'a', - step_id: 'r:a', - input_dataclip_id: undefined, + step_id: 'r:a' }; const channel = await join(client, run.id); - channel.push(STEP_START, event).receive('error', () => { - t.pass('event rejected'); + channel.push(STEP_START, event).receive('ok', () => { + t.pass('event accepted'); done(); }); }); }); + diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index aed7257a9..2674a05a1 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,11 @@ # ws-worker +## 1.6.4 + +### Patch Changes + +- 0cf7198: Do not send the input_dataclip_id in step:start if the dataclip was witheld + ## 1.6.3 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 01f167394..9456405ac 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.6.3", + "version": "1.6.4", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 54918628d..5307ee2dd 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -64,6 +64,7 @@ export default async function onStepComplete( } evt.output_dataclip_id = dataclipId; } catch (e) { + state.withheldDataclips[dataclipId] = true; evt.output_dataclip_error = 'DATACLIP_TOO_LARGE'; const time = (timestamp() - BigInt(10e6)).toString(); diff --git a/packages/ws-worker/src/events/step-start.ts b/packages/ws-worker/src/events/step-start.ts index ad5523483..bcfa8aa6a 100644 --- a/packages/ws-worker/src/events/step-start.ts +++ b/packages/ws-worker/src/events/step-start.ts @@ -18,10 +18,14 @@ export default async function onStepStart( const input_dataclip_id = state.inputDataclips[event.jobId]; - await sendEvent(channel, STEP_START, { + const evt: StepStartPayload = { step_id: state.activeStep!, job_id: state.activeJob!, - input_dataclip_id, timestamp: timeInMicroseconds(event.time), - }); + }; + if (!state.withheldDataclips[input_dataclip_id]) { + evt.input_dataclip_id = input_dataclip_id; + } + + await sendEvent(channel, STEP_START, evt); } diff --git a/packages/ws-worker/src/types.d.ts b/packages/ws-worker/src/types.d.ts index a9d09fbeb..974a79439 100644 --- a/packages/ws-worker/src/types.d.ts +++ b/packages/ws-worker/src/types.d.ts @@ -14,6 +14,8 @@ export type RunState = { // For each run, map the input ids // TODO better name maybe? inputDataclips: Record; + // If for any reason a dataclip was not sent to lightning, track it + withheldDataclips: Record; reasons: Record; // final dataclip id diff --git a/packages/ws-worker/src/util/create-run-state.ts b/packages/ws-worker/src/util/create-run-state.ts index 460a28027..bb30fa05a 100644 --- a/packages/ws-worker/src/util/create-run-state.ts +++ b/packages/ws-worker/src/util/create-run-state.ts @@ -6,6 +6,7 @@ export default (plan: ExecutionPlan, input?: Lazy): RunState => { lastDataclipId: '', dataclips: {}, inputDataclips: {}, + withheldDataclips: {}, reasons: {}, plan, input, diff --git a/packages/ws-worker/src/util/timestamp.ts b/packages/ws-worker/src/util/timestamp.ts index bc78e3fb7..d49d209ae 100644 --- a/packages/ws-worker/src/util/timestamp.ts +++ b/packages/ws-worker/src/util/timestamp.ts @@ -1,2 +1,4 @@ +import { TimeInMicroSeconds } from '@openfn/lexicon/lightning'; + export const timeInMicroseconds = (time?: bigint) => - time && (BigInt(time) / BigInt(1e3)).toString(); + (time && (BigInt(time) / BigInt(1e3)).toString()) as TimeInMicroSeconds; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index ef4d6f42c..4cc53c750 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -206,6 +206,9 @@ test('do not include dataclips in step:complete if output_dataclip is too big', const channel = mockChannel({ [RUN_LOG]: () => true, [STEP_COMPLETE]: (evt: StepCompletePayload) => { + const clipId = state.inputDataclips['a']; + t.true(state.withheldDataclips[clipId]) + t.falsy(evt.output_dataclip_id); t.falsy(evt.output_dataclip); t.is(evt.output_dataclip_error, 'DATACLIP_TOO_LARGE'); diff --git a/packages/ws-worker/test/events/step-start.test.ts b/packages/ws-worker/test/events/step-start.test.ts index c75a1034f..b35099526 100644 --- a/packages/ws-worker/test/events/step-start.test.ts +++ b/packages/ws-worker/test/events/step-start.test.ts @@ -60,6 +60,40 @@ test('send a step:start event', async (t) => { await handleStepStart({ channel, state } as any, event); }); +test('if the input dataclip was withheld, do not send it', async (t) => { + const plan = { + id: 'run-1', + workflow: { + steps: [ + { id: 'job-1', expression: '.' }, + { id: 'job-2', expression: '.' }, + ], + }, + options: {}, + }; + const input = 'abc'; + const jobId = 'job-1'; + + const state = createRunState(plan, input); + state.activeJob = jobId; + state.activeStep = 'b'; + + // register and withhold the dataclip + state.withheldDataclips['abc'] = true + state.inputDataclips[jobId] = 'abc'; + + const channel = mockChannel({ + [STEP_START]: (evt) => { + t.falsy(evt.input_dataclip_id, input); + return true; + }, + [RUN_LOG]: () => true, + }); + + const event = { jobId } as any; + await handleStepStart({ channel, state } as any, event); +}); + test('should include a timestamp', async (t) => { const plan = { id: 'run-1',