Skip to content

Commit

Permalink
worker: trap errors coming out of the websocket (#783)
Browse files Browse the repository at this point in the history
* worker: trap errors coming out of the websocket

* formatting
  • Loading branch information
josephjclark authored Sep 25, 2024
1 parent f33a865 commit 3e6eba2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/witty-candles-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Trap errors coming out of the websocket
74 changes: 39 additions & 35 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,46 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
// TODO this probably needs to move into ./api/ somewhere
app.execute = async ({ id, token }: ClaimRun) => {
if (app.socket) {
app.workflows[id] = true;

// TODO if we fail to join the run channel, the whole server
// will crash. Really we should just abort the run somehow
// Maybe even soft shutdown the worker
const {
channel: runChannel,
plan,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
delete app.workflows[id];
runChannel.leave();

app.events.emit(INTERNAL_RUN_COMPLETE);
};
const context = execute(
runChannel,
engine,
logger,
plan,
input,
options,
onFinish
);
try {
app.workflows[id] = true;

const {
channel: runChannel,
plan,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
}

// Callback to be triggered when the work is done (including errors)
const onFinish = () => {
logger.debug(`workflow ${id} complete: releasing worker`);
delete app.workflows[id];
runChannel.leave();

app.events.emit(INTERNAL_RUN_COMPLETE);
};
const context = execute(
runChannel,
engine,
logger,
plan,
input,
options,
onFinish
);

app.workflows[id] = context;
app.workflows[id] = context;
} catch (e) {
// Trap errors coming out of the socket
// These are likely to be comms errors with Lightning
logger.error(`Unexpected error executing ${id}`);
logger.error(e);
}
} else {
logger.error('No lightning socket established');
// TODO something else. Throw? Emit?
Expand Down

0 comments on commit 3e6eba2

Please sign in to comment.