Skip to content

Commit

Permalink
Worker: do not send input_dataclip_id if the previous output was too …
Browse files Browse the repository at this point in the history
…large (#774)

* lighting-mock: don't throw if no input_dataclip_id on step:start

* worker: do not send the input_dataclip_id in step:start if the dataclip was witheld

* format

* version: [email protected]

* remove .only

* fix typo
  • Loading branch information
josephjclark authored Sep 19, 2024
1 parent 98122be commit f33a865
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 17 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.59

### Patch Changes

- Updated dependencies [0cf7198]
- @openfn/ws-worker@1.6.4

## 1.0.58

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.58",
"version": "1.0.59",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
7 changes: 1 addition & 6 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ const createSocketAPI = (
evt: PhoenixEvent<StepStartPayload>
) {
const { ref, join_ref, topic } = evt;
const { step_id, job_id, input_dataclip_id } = evt.payload;
const { step_id, job_id } = evt.payload;

const [_, runId] = topic.split(':');
if (!state.dataclips) {
Expand All @@ -414,11 +414,6 @@ const createSocketAPI = (
status: 'error',
response: 'no job_id',
};
} else if (!input_dataclip_id) {
payload = {
status: 'error',
response: 'no input_dataclip_id',
};
}

ws.reply<StepStartReply>({
Expand Down
10 changes: 5 additions & 5 deletions packages/lightning-mock/test/events/step-start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,23 @@ test.serial('error if no job_id', async (t) => {
});
});

test.serial('error if no input_dataclip_id', async (t) => {
test.serial('no error if no input_dataclip_id', async (t) => {
return new Promise(async (done) => {
const run = createRun();

server.startRun(run.id);

const event = {
job_id: 'a',
step_id: 'r:a',
input_dataclip_id: undefined,
step_id: 'r:a'
};

const channel = await join(client, run.id);

channel.push(STEP_START, event).receive('error', () => {
t.pass('event rejected');
channel.push(STEP_START, event).receive('ok', () => {
t.pass('event accepted');
done();
});
});
});

6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.6.4

### Patch Changes

- 0cf7198: Do not send the input_dataclip_id in step:start if the dataclip was witheld

## 1.6.3

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.6.3",
"version": "1.6.4",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export default async function onStepComplete(
}
evt.output_dataclip_id = dataclipId;
} catch (e) {
state.withheldDataclips[dataclipId] = true;
evt.output_dataclip_error = 'DATACLIP_TOO_LARGE';

const time = (timestamp() - BigInt(10e6)).toString();
Expand Down
10 changes: 7 additions & 3 deletions packages/ws-worker/src/events/step-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ export default async function onStepStart(

const input_dataclip_id = state.inputDataclips[event.jobId];

await sendEvent<StepStartPayload>(channel, STEP_START, {
const evt: StepStartPayload = {
step_id: state.activeStep!,
job_id: state.activeJob!,
input_dataclip_id,
timestamp: timeInMicroseconds(event.time),
});
};
if (!state.withheldDataclips[input_dataclip_id]) {
evt.input_dataclip_id = input_dataclip_id;
}

await sendEvent<StepStartPayload>(channel, STEP_START, evt);
}
2 changes: 2 additions & 0 deletions packages/ws-worker/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type RunState = {
// For each run, map the input ids
// TODO better name maybe?
inputDataclips: Record<string, string>;
// If for any reason a dataclip was not sent to lightning, track it
withheldDataclips: Record<string, true>;
reasons: Record<string, ExitReason>;

// final dataclip id
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/util/create-run-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export default (plan: ExecutionPlan, input?: Lazy<State>): RunState => {
lastDataclipId: '',
dataclips: {},
inputDataclips: {},
withheldDataclips: {},
reasons: {},
plan,
input,
Expand Down
4 changes: 3 additions & 1 deletion packages/ws-worker/src/util/timestamp.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
import { TimeInMicroSeconds } from '@openfn/lexicon/lightning';

export const timeInMicroseconds = (time?: bigint) =>
time && (BigInt(time) / BigInt(1e3)).toString();
(time && (BigInt(time) / BigInt(1e3)).toString()) as TimeInMicroSeconds;
3 changes: 3 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ test('do not include dataclips in step:complete if output_dataclip is too big',
const channel = mockChannel({
[RUN_LOG]: () => true,
[STEP_COMPLETE]: (evt: StepCompletePayload) => {
const clipId = state.inputDataclips['a'];
t.true(state.withheldDataclips[clipId])

t.falsy(evt.output_dataclip_id);
t.falsy(evt.output_dataclip);
t.is(evt.output_dataclip_error, 'DATACLIP_TOO_LARGE');
Expand Down
34 changes: 34 additions & 0 deletions packages/ws-worker/test/events/step-start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,40 @@ test('send a step:start event', async (t) => {
await handleStepStart({ channel, state } as any, event);
});

test('if the input dataclip was withheld, do not send it', async (t) => {
const plan = {
id: 'run-1',
workflow: {
steps: [
{ id: 'job-1', expression: '.' },
{ id: 'job-2', expression: '.' },
],
},
options: {},
};
const input = 'abc';
const jobId = 'job-1';

const state = createRunState(plan, input);
state.activeJob = jobId;
state.activeStep = 'b';

// register and withhold the dataclip
state.withheldDataclips['abc'] = true
state.inputDataclips[jobId] = 'abc';

const channel = mockChannel({
[STEP_START]: (evt) => {
t.falsy(evt.input_dataclip_id, input);
return true;
},
[RUN_LOG]: () => true,
});

const event = { jobId } as any;
await handleStepStart({ channel, state } as any, event);
});

test('should include a timestamp', async (t) => {
const plan = {
id: 'run-1',
Expand Down

0 comments on commit f33a865

Please sign in to comment.