Skip to content

Commit

Permalink
Merge pull request #760 from OpenFn/worker-handle-bad-attempts
Browse files Browse the repository at this point in the history
Worker: don't blow up if a run doesn't have a start node
  • Loading branch information
josephjclark authored Sep 5, 2024
2 parents 30aaa8e + 78de131 commit db48969
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 18 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.56

### Patch Changes

- Updated dependencies [ca07db4]
- @openfn/ws-worker@1.6.1

## 1.0.55

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.55",
"version": "1.0.56",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
25 changes: 25 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ test('crash: syntax error', async (t) => {
t.regex(error_message, /Unexpected token \(1:9\)$/);
});

// https://github.com/OpenFn/kit/issues/758
test('crash: job not found', async (t) => {
lightning.addDataclip('x', {});

const attempt = {
id: crypto.randomUUID(),
jobs: [
{
id: 'x',
adaptor: '@openfn/language-common@latest',
body: 'fn(s => s)',
},
],
dataclip_id: 'x', // having a data clip is important to trigger the crash
starting_node_id: 'y',
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;
t.is(reason, 'crash');
t.is(error_type, 'ValidationError');
t.regex(error_message, /could not find start job: y/i);
});

test('exception: autoinstall error', async (t) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down
6 changes: 5 additions & 1 deletion packages/lightning-mock/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ const args = yargs(hideBin(process.argv))

const logger = createLogger('LNG', { level: args.log });

createLightningServer({
const server = createLightningServer({
port: args.port,
logger,
logLevel: args.log,
});

// add a default credential
server.addCredential('c', { user: 'user ' });
server.addDataclip('d', { data: {} });

logger.success('Started mock Lightning server on ', args.port);
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.6.1

### Patch Changes

- ca07db4: Fix an issue where a run with a missing start node caused the server to crash

## 1.6.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.6.0",
"version": "1.6.1",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 0 additions & 2 deletions packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ const destroy = async (app: ServerApp, logger: Logger) => {
await app.engine.destroy();
app.socket?.disconnect();

logger.info('Server closed....');

resolve();
}),
]);
Expand Down
1 change: 0 additions & 1 deletion packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ export function execute(
loadedInput = await loadDataclip(channel, input);
logger.success('dataclip loaded');
} catch (e: any) {
// abort with error
return handleRunError(context, {
workflowId: plan.id!,
message: `Failed to load dataclip ${input}${
Expand Down
15 changes: 10 additions & 5 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
if (app.killWorkloop) {
app.killWorkloop();
delete app.killWorkloop;
logger.info('Connection to lightning lost.');
logger.info(
'Worker will automatically reconnect when lightning is back online.'
);
// So far as I know, the socket will try and reconnect in the background forever
if (!app.destroyed) {
logger.info('Connection to lightning lost');
logger.info(
'Worker will automatically reconnect when lightning is back online'
);
// So far as I know, the socket will try and reconnect in the background forever
}
}
};

Expand Down Expand Up @@ -180,6 +182,9 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
if (app.socket) {
app.workflows[id] = true;

// TODO if we fail to join the run channel, the whole server
// will crash. Really we should just abort the run somehow
// Maybe even soft shutdown the worker
const {
channel: runChannel,
plan,
Expand Down
16 changes: 9 additions & 7 deletions packages/ws-worker/src/util/create-run-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ export default (plan: ExecutionPlan, input?: Lazy<State>): RunState => {
startNode = jobs.find(({ id }) => id === plan.options.start)!;
}

// TODO throw with validation error of some kind if this node could not be found

const initialRuns: string[] = [];
// If this is a trigger, get the downstream jobs
if (!startNode.expression) {
initialRuns.push(...Object.keys(startNode.next!));
} else {
initialRuns.push(startNode.id!);
// Note that the workflow hasn't been properly validated yet
// and it's technically possible that there is no start node
if (startNode) {
// If this is a trigger, get the downstream jobs
if (!startNode.expression) {
initialRuns.push(...Object.keys(startNode.next!));
} else {
initialRuns.push(startNode.id!);
}
}

// For any runs downstream of the initial state,
Expand Down
21 changes: 21 additions & 0 deletions packages/ws-worker/test/util/create-run-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,24 @@ test('Set initial input dataclip with a trigger with multiple downstream jobs',

t.deepEqual(run.inputDataclips, { a: 's', b: 's', c: 's' });
});

test("Do not throw if the start step doesn't exist", (t) => {
const plan = createPlan([{ id: 'a' }]);
plan.options.start = 'wibble';
const input = 'x';

createRunState(plan, input);

t.pass('did not throw');
});

test('Do not throw if there are no steps', (t) => {
const plan = createPlan([{ id: 'a' }]);
plan.workflow.steps = [];

const input = 'x';

createRunState(plan, input);

t.pass('did not throw');
});

0 comments on commit db48969

Please sign in to comment.