Skip to content

Commit

Permalink
Merge pull request #524 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Next release
  • Loading branch information
taylordowns2000 authored Dec 3, 2023
2 parents 64cc482 + 2f10a27 commit 2a71ee3
Show file tree
Hide file tree
Showing 35 changed files with 593 additions and 215 deletions.
5 changes: 0 additions & 5 deletions .changeset/twenty-oranges-push.md

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ New releases will be published to npm automatically when merging into main.

Before merging to main, check out the release branch locally and run the following steps:

1. Run `pnpm changeset` from root to bump versions
1. Run `pnpm changeset version` from root to bump versions
1. Run `pnpm install`
1. Commit the new version numbers
1. Run `pnpm changeset tag` to generate tags
Expand Down
11 changes: 11 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/integration-tests-worker

## 1.0.23

### Patch Changes

- Updated dependencies [6c3e9e42]
- Updated dependencies [05ccc10b]
- Updated dependencies [7235bf5e]
- @openfn/ws-worker@0.2.12
- @openfn/engine-multi@0.2.3
- @openfn/lightning-mock@1.1.5

## 1.0.22

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.22",
"version": "1.0.23",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
20 changes: 20 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,23 @@ test('kill: oom', async (t) => {
t.is(error_type, 'OOMError');
t.is(error_message, 'Run exceeded maximum memory usage');
});

test('crash: process.exit() triggered by postgres', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]', // version number is important
body: "sql('select * from food_hygiene_interview');",
},
],
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;

t.is(reason, 'crash');
t.is(error_type, 'ExitError');
t.regex(error_message, /Process exited with code: 1/i);
});
59 changes: 59 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,65 @@ test('blacklist a non-openfn adaptor', (t) => {
});
});

test('a timeout error should still call run-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest', // version lock to something stable?
body: 'fn((s) => new Promise((resolve) => setTimeout(() => resolve(s), 1000)))',
},
],
options: {
// Including the timeout here stops the attempt returning at all
timeout: 100,
},
};

lightning.once('run:complete', (event) => {
t.is(event.payload.reason, 'kill');
});

lightning.once('attempt:complete', (event) => {
done();
});

lightning.enqueueAttempt(attempt);
});
});

test('an OOM error should still call run-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest', // version lock to something stable?
body: `
fn((s) => {
s.data = [];
while(true) {
s.data.push(new Array(1e5).fill("xyz"))
}
return s;
})`,
},
],
};

lightning.once('run:complete', (event) => {
t.is(event.payload.reason, 'kill');
});

lightning.once('attempt:complete', (event) => {
done();
});

lightning.enqueueAttempt(attempt);
});
});

// test('run a job with complex behaviours (initial state, branching)', (t) => {
// const attempt = {
// id: 'a1',
Expand Down
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 0.2.3

### Patch Changes

- 05ccc10b: Handle async errors in the runtime
- 7235bf5e: Throw a better error on process.exit

## 0.2.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "0.2.2",
"version": "0.2.3",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
23 changes: 13 additions & 10 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
jobError,
} from './lifecycle';
import preloadCredentials from './preload-credentials';
import { ExecutionError, OOMError, TimeoutError } from '../errors';
import { ExecutionError, ExitError, OOMError, TimeoutError } from '../errors';

const execute = async (context: ExecutionContext) => {
const { state, callWorker, logger, options } = context;
Expand Down Expand Up @@ -84,22 +84,25 @@ const execute = async (context: ExecutionContext) => {
events,
options.timeout
).catch((e: any) => {
// An error here is basically a crash state
if (e.code === 'ERR_WORKER_OUT_OF_MEMORY') {
// Catch process.exit from inside the thread
// This approach is not pretty - we are banking on replacing workerpool soon
if (e.message.match(/^Workerpool Worker terminated Unexpectedly/)) {
const [_match, exitCode] = e.message.match(/exitCode: `(\d+)`/);
if (exitCode === '111111') {
// This means a controlled exit from inside the worker
// The error has already been reported and we should do nothing
return;
}
e = new ExitError(parseInt(exitCode));
} else if (e.code === 'ERR_WORKER_OUT_OF_MEMORY') {
e = new OOMError();
} else if (e instanceof WorkerPoolPromise.TimeoutError) {
// Map the workerpool error to our own
e = new TimeoutError(options.timeout!);
}

// TODO: map anything else to an executionError

// TODO what information can I usefully provide here?
// DO I know which job I'm on?
// DO I know the thread id?
// Do I know where the error came from?
error(context, { workflowId: state.plan.id, error: e });
logger.error(e);
logger.error(`Critical error thrown by ${state.plan.id}`, e);
});
} catch (e: any) {
if (!e.severity) {
Expand Down
19 changes: 18 additions & 1 deletion packages/engine-multi/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class EngineError extends Error {
// This is thrown if a workflow takes too long to run
// It is generated by workerpool and thrown if the workerpool promise fails to resolve
export class TimeoutError extends EngineError {
severity = 'crash';
severity = 'kill';
type = 'TimeoutError';
duration;
constructor(durationInMs: number) {
Expand Down Expand Up @@ -82,4 +82,21 @@ export class OOMError extends EngineError {
}
}

export class ExitError extends EngineError {
severity = 'crash';
type = 'ExitError';
name = 'ExitError';
code;
message;

constructor(code: number) {
super();
this.code = code;
this.message = `Process exited with code: ${code}`;
// Remove the stack trace
// It contains no useful information
this.stack = '';
}
}

// CredentialsError (exception)
14 changes: 9 additions & 5 deletions packages/engine-multi/src/worker/worker-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,19 @@ async function helper(workflowId: string, execute: () => Promise<any>) {
// For now, we'll write this off as a crash-level generic execution error
// TODO did this come from job or adaptor code?
const e = new ExecutionError(err);
e.severity = 'crash'; // Downgrade this to a crash becuase it's likely not our fault
e.severity = 'crash'; // Downgrade this to a crash because it's likely not our fault
handleError(e);
process.exit(1);

// Close down the process justto be 100% sure that all async code stops
// This is in a timeout to give the emitted message time to escape
// There is a TINY WINDOW in which async code can still run and affect the next attempt
// This should all go away when we replace workerpool
setTimeout(() => {
process.exit(111111);
}, 2);
});

try {
// Note that the worker thread may fire logs after completion
// I think this is fine, it's just a log stream thing
// But the output is very confusing!
const result = await execute();
publish(workflowId, workerEvents.WORKFLOW_COMPLETE, { state: result });

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions packages/engine-multi/test/__repo__/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "test-repo",
"private": true,
"version": "1.0.0",
"dependencies": {
"helper_1.0.0": "@npm:[email protected]"
}
}
37 changes: 31 additions & 6 deletions packages/engine-multi/test/errors.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import test from 'ava';
import path from 'node:path';

import createEngine, { EngineOptions } from '../src/engine';
import { createMockLogger } from '@openfn/logger';
import { WORKFLOW_ERROR } from '../src/events';
import { WORKFLOW_COMPLETE, WORKFLOW_ERROR } from '../src/events';

let engine;

Expand All @@ -10,7 +12,7 @@ test.before(async () => {

const options: EngineOptions = {
logger,
repoDir: '.',
repoDir: path.resolve('./test/__repo__'),
autoinstall: {
// disable autoinstall
handleIsInstalled: async () => true,
Expand Down Expand Up @@ -102,11 +104,12 @@ test.serial('execution error from async code', (t) => {
id: 'a',
jobs: [
{
// this error will throw within the promise, and so before the job completes
// But REALLY naughty code could throw after the job has finished
// In which case it'll be ignored
// Also note that the wrapping promise will never resolve
expression: `export default [(s) => new Promise((r) => {
// this error will throw within the promise, and so before the job completes
// But REALLY naughty code could throw after the job has finished
// In which case it'll be ignored
setTimeout(() => { throw new Error(\"e\");r () }, 1)
setTimeout(() => { throw new Error(\"e1324\"); r() }, 10)
})]`,
},
],
Expand All @@ -115,6 +118,28 @@ test.serial('execution error from async code', (t) => {
engine.execute(plan).on(WORKFLOW_ERROR, (evt) => {
t.is(evt.type, 'ExecutionError');
t.is(evt.severity, 'crash');

done();
});
});
});

test.serial('emit a crash error on process.exit()', (t) => {
return new Promise((done) => {
const plan = {
id: 'z',
jobs: [
{
adaptor: '[email protected]',
expression: `export default [exit()]`,
},
],
};

engine.execute(plan).on(WORKFLOW_ERROR, (evt) => {
t.is(evt.type, 'ExitError');
t.is(evt.severity, 'crash');
t.is(evt.message, 'Process exited with code: 42');
done();
});
});
Expand Down
8 changes: 8 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/lightning-mock

## 1.1.5

### Patch Changes

- Updated dependencies [05ccc10b]
- Updated dependencies [7235bf5e]
- @openfn/engine-multi@0.2.3

## 1.1.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "1.1.4",
"version": "1.1.5",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
12 changes: 1 addition & 11 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,21 +419,11 @@ const createSocketAPI = (

let payload: any = validateReasons(evt.payload);

if (!output_dataclip) {
payload = {
status: 'error',
response: 'no output_dataclip',
};
} else if (output_dataclip_id) {
if (output_dataclip_id && output_dataclip) {
if (!state.dataclips) {
state.dataclips = {};
}
state.dataclips[output_dataclip_id] = JSON.parse(output_dataclip!);
} else {
payload = {
status: 'error',
response: 'no output_dataclip_id',
};
}

// be polite and acknowledge the event
Expand Down
Loading

0 comments on commit 2a71ee3

Please sign in to comment.