Skip to content

Commit

Permalink
Merge pull request #518 from OpenFn/complete-errors
Browse files Browse the repository at this point in the history
Call run:complete if a workflow errors
  • Loading branch information
josephjclark authored Dec 1, 2023
2 parents 0c9a5dd + eb24a58 commit 4bad96e
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 70 deletions.
59 changes: 59 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,65 @@ test('blacklist a non-openfn adaptor', (t) => {
});
});

test('a timeout error should still call run-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest', // version lock to something stable?
body: 'fn((s) => new Promise((resolve) => setTimeout(() => resolve(s), 1000)))',
},
],
options: {
// Including the timeout here stops the attempt returning at all
timeout: 100,
},
};

lightning.once('run:complete', (event) => {
t.is(event.payload.reason, 'kill');
});

lightning.once('attempt:complete', (event) => {
done();
});

lightning.enqueueAttempt(attempt);
});
});

test('an OOM error should still call run-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest', // version lock to something stable?
body: `
fn((s) => {
s.data = [];
while(true) {
s.data.push(new Array(1e5).fill("xyz"))
}
return s;
})`,
},
],
};

lightning.once('run:complete', (event) => {
t.is(event.payload.reason, 'kill');
});

lightning.once('attempt:complete', (event) => {
done();
});

lightning.enqueueAttempt(attempt);
});
});

// test('run a job with complex behaviours (initial state, branching)', (t) => {
// const attempt = {
// id: 'a1',
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class EngineError extends Error {
// This is thrown if a workflow takes too long to run
// It is generated by workerpool and thrown if the workerpool promise fails to resolve
export class TimeoutError extends EngineError {
severity = 'crash';
severity = 'kill';
type = 'TimeoutError';
duration;
constructor(durationInMs: number) {
Expand Down
12 changes: 1 addition & 11 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,21 +419,11 @@ const createSocketAPI = (

let payload: any = validateReasons(evt.payload);

if (!output_dataclip) {
payload = {
status: 'error',
response: 'no output_dataclip',
};
} else if (output_dataclip_id) {
if (output_dataclip_id && output_dataclip) {
if (!state.dataclips) {
state.dataclips = {};
}
state.dataclips[output_dataclip_id] = JSON.parse(output_dataclip!);
} else {
payload = {
status: 'error',
response: 'no output_dataclip_id',
};
}

// be polite and acknowledge the event
Expand Down
40 changes: 0 additions & 40 deletions packages/lightning-mock/test/events/run-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,46 +52,6 @@ test.serial('save dataclip id to state', async (t) => {
});
});

test.serial('error if no dataclip', async (t) => {
return new Promise(async (done) => {
const attempt = createAttempt();

server.startAttempt(attempt.id);

const event = {
reason: 'success',
output_dataclip: null,
output_dataclip_id: 't',
};
const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('error', () => {
t.pass('event rejected');
done();
});
});
});

test.serial('error if no dataclip_d', async (t) => {
return new Promise(async (done) => {
const attempt = createAttempt();

server.startAttempt(attempt.id);

const event = {
reason: 'success',
output_dataclip: JSON.stringify({ x: 22 }),
output_dataclip_id: undefined,
};
const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('error', () => {
t.pass('event rejected');
done();
});
});
});

test.serial('error if no reason', async (t) => {
return new Promise(async (done) => {
const attempt = createAttempt();
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ export class SecurityError extends RTError {
export class TimeoutError extends RTError {
type = 'TimeoutError';
name = 'TimeoutError';
severity = 'crash';
severity = 'kill';
message: string;
constructor(duration: number) {
super();
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ test('crash on timeout', async (t) => {
}

t.truthy(error);
t.is(error.severity, 'crash');
t.is(error.severity, 'kill');
t.is(error.type, 'TimeoutError');
t.is(error.message, 'Job took longer than 1ms to complete');
});
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/test/execute/plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ test('keep executing after an error', async (t) => {
t.falsy(result.x);
});

test.only('simple on-error handler', async (t) => {
test('simple on-error handler', async (t) => {
const plan: ExecutionPlan = {
jobs: [
{
Expand Down
20 changes: 12 additions & 8 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export function onJobError(context: Context, event: any) {
// because it'll count it as a crash
// This isn't very good: maybe we shouldn't trigger an error
// at all for a fail state?
const { state, error, jobId } = event;
const { state = {}, error, jobId } = event;
// This test is horrible too
if (state.errors?.[jobId]?.message === error.message) {
onJobComplete(context, event);
Expand Down Expand Up @@ -277,19 +277,21 @@ export async function onWorkflowComplete(
onFinish({ reason, state: result });
}

// On error, for now, we just post to workflow complete
// No unit tests on this (not least because I think it'll change soon)
// NB this is a crash state!
export async function onWorkflowError(
{ state, channel, logger, onFinish }: Context,
context: Context,
event: WorkflowErrorPayload
) {
// Should we not just report this reason?
// Nothing more severe can have happened downstream, right?
// const reason = calculateAttemptExitReason(state);
const { state, channel, logger, onFinish } = context;

try {
// Ok, let's try that, let's just generate a reason from the event
const reason = calculateJobExitReason('', { data: {} }, event);

// If there's a job still running, make sure it gets marked complete
if (state.activeJob) {
await onJobError(context, { error: event });
}

await sendEvent<AttemptCompletePayload>(channel, ATTEMPT_COMPLETE, {
final_dataclip_id: state.lastDataclipId!,
...reason,
Expand All @@ -299,6 +301,8 @@ export async function onWorkflowError(
} catch (e: any) {
logger.error('ERROR in workflow-error handler:', e.message);
logger.error(e);

onFinish({});
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,17 @@ async function createMock() {

try {
await run(xplan, undefined, opts as any);
dispatch('workflow-complete', { workflowId: id, threadId: threadId });
} catch (e: any) {
dispatch('workflow-error', {
threadId: threadId,
workflowId: id,
type: e.name,
message: e.message,
});
} finally {
delete activeWorkflows[id!];
}

delete activeWorkflows[id!];
dispatch('workflow-complete', { workflowId: id, threadId: threadId });
}, 1);

// Technically the engine should return an event emitter
Expand Down
115 changes: 114 additions & 1 deletion packages/ws-worker/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
execute,
onWorkflowStart,
onWorkflowComplete,
onWorkflowError,
loadDataclip,
loadCredential,
sendEvent,
Expand All @@ -27,6 +28,11 @@ import { stringify, createAttemptState } from '../../src/util';

import type { ExecutionPlan } from '@openfn/runtime';
import type { AttemptState } from '../../src/types';
import {
JOB_COMPLETE,
JOB_ERROR,
WORKFLOW_COMPLETE,
} from '@openfn/engine-multi';

const enc = new TextEncoder();

Expand Down Expand Up @@ -220,7 +226,7 @@ test('jobComplete should generate an exit reason: success', async (t) => {
t.is(event.error_message, null);
});

test.only('jobComplete should send a run:complete event', async (t) => {
test('jobComplete should send a run:complete event', async (t) => {
const plan = { id: 'attempt-1' };
const jobId = 'job-1';
const result = { x: 10 };
Expand Down Expand Up @@ -382,6 +388,113 @@ test('workflowComplete should call onFinish with final dataclip', async (t) => {
await onWorkflowComplete(context, event);
});

test('workflowError should trigger runComplete with a reason', async (t) => {
const jobId = 'job-1';

const state = {
reasons: {},
dataclips: {},
lastDataclipId: 'x',
activeJob: jobId,
activeRun: 'b',
errors: {},
};

const channel = mockChannel({
[RUN_COMPLETE]: (evt) => {
t.is(evt.reason, 'crash');
t.is(evt.error_message, 'it crashed');
return true;
},
[ATTEMPT_COMPLETE]: () => true,
});

const event = {
severity: 'crash',
type: 'Err',
message: 'it crashed',
};

const context = { channel, state, onFinish: () => {} };

await onWorkflowError(context, event);
});

test('workflow error should send reason to onFinish', async (t) => {
const jobId = 'job-1';

const state = {
reasons: {},
dataclips: {},
lastDataclipId: 'x',
activeJob: jobId,
activeRun: 'b',
errors: {},
};

const channel = mockChannel({
[RUN_COMPLETE]: (evt) => true,
[ATTEMPT_COMPLETE]: () => true,
});

const event = {
error: {
severity: 'crash',
type: 'Err',
message: 'it crashed',
},
state: {},
};

const context = {
channel,
state,
onFinish: (evt) => {
t.is(evt.reason.reason, 'crash');
},
};

await onWorkflowError(context, event);
});

test('workflowError should not call job complete if the job is not active', async (t) => {
const state = {
reasons: {},
dataclips: {},
lastDataclipId: 'x',
activeJob: undefined,
activeRun: undefined,
errors: {},
};

const channel = mockChannel({
[RUN_COMPLETE]: (evt) => {
t.fail('should not call!');
return true;
},
[ATTEMPT_COMPLETE]: () => true,
});

const event = {
error: {
severity: 'crash',
type: 'Err',
message: 'it crashed',
},
state: {},
};

const context = {
channel,
state,
onFinish: () => {
t.pass();
},
};

await onWorkflowError(context, event);
});

// TODO what if an error?
test('loadDataclip should fetch a dataclip', async (t) => {
const channel = mockChannel({
Expand Down
Loading

0 comments on commit 4bad96e

Please sign in to comment.