diff --git a/.changeset/witty-candles-shake.md b/.changeset/witty-candles-shake.md new file mode 100644 index 000000000..a760ad923 --- /dev/null +++ b/.changeset/witty-candles-shake.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Trap errors coming out of the websocket diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 6d52f389a..5e15e69c3 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -180,42 +180,46 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { // TODO this probably needs to move into ./api/ somewhere app.execute = async ({ id, token }: ClaimRun) => { if (app.socket) { - app.workflows[id] = true; - - // TODO if we fail to join the run channel, the whole server - // will crash. Really we should just abort the run somehow - // Maybe even soft shutdown the worker - const { - channel: runChannel, - plan, - options = {}, - input, - } = await joinRunChannel(app.socket, token, id, logger); - - // Default the payload limit if it's not otherwise set on the run options - if (!('payloadLimitMb' in options)) { - options.payloadLimitMb = app.options.payloadLimitMb; - } - - // Callback to be triggered when the work is done (including errors) - const onFinish = () => { - logger.debug(`workflow ${id} complete: releasing worker`); - delete app.workflows[id]; - runChannel.leave(); - - app.events.emit(INTERNAL_RUN_COMPLETE); - }; - const context = execute( - runChannel, - engine, - logger, - plan, - input, - options, - onFinish - ); + try { + app.workflows[id] = true; + + const { + channel: runChannel, + plan, + options = {}, + input, + } = await joinRunChannel(app.socket, token, id, logger); + + // Default the payload limit if it's not otherwise set on the run options + if (!('payloadLimitMb' in options)) { + options.payloadLimitMb = app.options.payloadLimitMb; + } + + // Callback to be triggered when the work is done (including errors) + const onFinish = () => { + logger.debug(`workflow ${id} complete: releasing worker`); + delete app.workflows[id]; + runChannel.leave(); + + app.events.emit(INTERNAL_RUN_COMPLETE); + }; + const context = execute( + runChannel, + engine, + logger, + plan, + input, + options, + onFinish + ); - app.workflows[id] = context; + app.workflows[id] = context; + } catch (e) { + // Trap errors coming out of the socket + // These are likely to be comms errors with Lightning + logger.error(`Unexpected error executing ${id}`); + logger.error(e); + } } else { logger.error('No lightning socket established'); // TODO something else. Throw? Emit?