Skip to content

Commit

Permalink
Worker: ensure workers are released after an error on run:complete (#685
Browse files Browse the repository at this point in the history
)

* worker: added logging

Noisy but useful

* worker: ensure run:complete returns even if the socket throws

* changeset

* worker: add a bit more logging on the channel

* version: [email protected]

* worker: fix mock socket
  • Loading branch information
josephjclark authored May 10, 2024
1 parent 24d2014 commit 75f9087
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 7 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.42

### Patch Changes

- Updated dependencies [bdff4b2]
- @openfn/ws-worker@1.1.7

## 1.0.41

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.41",
"version": "1.0.42",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.1.7

### Patch Changes

- bdff4b2: Fix an issue where workers may not be returned to the pool if run:complete throws

## 1.1.6

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.1.6",
"version": "1.1.7",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 2 additions & 0 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ const claim = (

const activeWorkers = Object.keys(app.workflows).length;
if (activeWorkers >= maxWorkers) {
logger.debug('skipping claim attempt: server at capacity');
return reject(new Error('Server at capacity'));
}

if (!app.queueChannel) {
logger.debug('skipping claim attempt: websocket unavailable');
return reject(new Error('No websocket available'));
}

Expand Down
14 changes: 14 additions & 0 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,21 @@ const joinRunChannel = (
.receive('error', (err: any) => {
logger.error(`error connecting to ${channelName}`, err);
reject(err);
})
.receive('timeout', (err: any) => {
logger.error(`Timeout for ${channelName}`, err);
reject(err);
});
channel.onClose(() => {
// channel was explicitly closed by the client or server
logger.debug(`Leaving ${channelName}`);
});
channel.onError((e: any) => {
// Error occurred on the channel
// (the socket will try to reconnect with backoff)
logger.debug(`Error in ${channelName}`);
logger.debug(e);
});
});
};

Expand Down
17 changes: 12 additions & 5 deletions packages/ws-worker/src/events/run-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default async function onWorkflowComplete(
context: Context,
_event: WorkflowCompletePayload
) {
const { state, channel, onFinish } = context;
const { state, channel, onFinish, logger } = context;

// TODO I dont think the run final dataclip IS the last job dataclip
// Especially not in parallelisation
Expand All @@ -19,10 +19,17 @@ export default async function onWorkflowComplete(
const reason = calculateRunExitReason(state);
await logFinalReason(context, reason);

await sendEvent<RunCompletePayload>(channel, RUN_COMPLETE, {
final_dataclip_id: state.lastDataclipId!,
...reason,
});
try {
await sendEvent<RunCompletePayload>(channel, RUN_COMPLETE, {
final_dataclip_id: state.lastDataclipId!,
...reason,
});
} catch (e) {
logger.error(
`${state.plan.id} failed to send ${RUN_COMPLETE} event. This run will be lost!`
);
logger.error(e);
}

onFinish({ reason, state: result });
}
4 changes: 4 additions & 0 deletions packages/ws-worker/src/mock/sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export const mockChannel = (
} catch (e) {
responses.error?.(e);
}
} else {
responses.timeout?.('timeout');
}
}, 1);

Expand Down Expand Up @@ -64,6 +66,8 @@ export const mockChannel = (
return receive;
},
leave: () => {},
onClose: () => {},
onError: () => {},
};
return c;
};
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
delete app.workflows[id];
runChannel.leave();

Expand Down
97 changes: 97 additions & 0 deletions packages/ws-worker/test/events/run-complete.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import test from 'ava';
import { createMockLogger } from '@openfn/logger';

import handleRunComplete from '../../src/events/run-complete';

import { mockChannel } from '../../src/mock/sockets';
Expand Down Expand Up @@ -143,3 +145,98 @@ test('should send a reason log and return reason for fail', async (t) => {
t.is(completeEvent.error_type, 'TEST');
t.is(completeEvent.error_message, 'err');
});

test('should call onFinish even if the lightning event throws', async (t) => {
const plan = createPlan();

const state = createRunState(plan);
state.dataclips = {
x: {},
};
state.lastDataclipId = 'x';

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: () => {
throw new Error('they came from... behind!');
},
});

const event: any = {};

const logger = createMockLogger();

const context: any = {
channel,
state,
onFinish: () => {
t.pass('On finish called');
},
logger,
};
await handleRunComplete(context, event);
});

test('should log if the lightning event throws', async (t) => {
const plan = createPlan();

const state = createRunState(plan);
state.dataclips = {
x: {},
};
state.lastDataclipId = 'x';

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: () => {
throw new Error('they came from... behind!');
},
});

const event: any = {};

const logger = createMockLogger();

const context: any = {
channel,
state,
onFinish: () => {
const message = logger._find(
'error',
/failed to send run:complete event/
);
t.truthy(message);
},
logger,
};
await handleRunComplete(context, event);
});

test('should call onFinish even if the lightning event timesout', async (t) => {
const plan = createPlan();

const state = createRunState(plan);
state.dataclips = {
x: {},
};
state.lastDataclipId = 'x';

const channel = mockChannel({
[RUN_LOG]: () => true,
// no event handler is registered, so the mock will throw a timeout
});

const event: any = {};

const logger = createMockLogger();

const context: any = {
channel,
state,
onFinish: () => {
t.pass('On finish called');
},
logger,
};
await handleRunComplete(context, event);
});

0 comments on commit 75f9087

Please sign in to comment.