Skip to content

Commit

Permalink
Merge pull request #516 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Next release
  • Loading branch information
josephjclark authored Nov 28, 2023
2 parents d9fc425 + 6102925 commit cb13173
Show file tree
Hide file tree
Showing 47 changed files with 973 additions and 731 deletions.
13 changes: 13 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# @openfn/integration-tests-worker

## 1.0.22

### Patch Changes

- Updated dependencies [22339c6]
- Updated dependencies [22339c6]
- Updated dependencies [04ac3cc]
- Updated dependencies [5991622]
- Updated dependencies [340b96e]
- @openfn/engine-multi@0.2.2
- @openfn/ws-worker@0.2.11
- @openfn/lightning-mock@1.1.4

## 1.0.21

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.21",
"version": "1.0.22",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
8 changes: 7 additions & 1 deletion integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ export const initLightning = (port = 4000) => {
return createLightningServer({ port });
};

export const initWorker = async (lightningPort, engineArgs = {}) => {
export const initWorker = async (
lightningPort,
engineArgs = {},
workerArgs = {}
) => {
const workerPort = randomPort();

const engineLogger = createMockLogger('engine', {
level: 'debug',
json: true,
});

const engine = await createEngine({
logger: engineLogger,
repoDir: path.resolve('./tmp/repo/default'),
Expand All @@ -33,6 +38,7 @@ export const initWorker = async (lightningPort, engineArgs = {}) => {
port: workerPort,
lightning: `ws://localhost:${lightningPort}/worker`,
secret: crypto.randomUUID(),
...workerArgs,
});

return { engine, engineLogger, worker };
Expand Down
17 changes: 17 additions & 0 deletions integration-tests/worker/src/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export const run = async (lightning, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('attempt:complete', (evt) => {
if (attempt.id === evt.attemptId) {
done(lightning.getResult(attempt.id));
} else {
// If we get here, something has gone very wrong
reject('attempt not found');
}
});

lightning.enqueueAttempt(attempt);
});
};

export const humanMb = (sizeInBytes: number) =>
Math.round(sizeInBytes / 1024 / 1024);
50 changes: 42 additions & 8 deletions integration-tests/worker/test/attempts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,28 @@ test.before(async () => {
}));
});

test.afterEach(async () => {
lightning.destroy();
});

test.after(async () => {
lightning.destroy();
await worker.destroy();
});

const run = async (attempt) => {
const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);

const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.once('attempt:complete', (evt) => {
lightning.on('run:complete', ({ payload }) => {
// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.run_id} done in ${payload.duration / 1000}s [${humanMb(
payload.mem.job
)} / ${humanMb(payload.mem.system)}mb] [thread ${payload.thread_id}]`
);
});
lightning.on('attempt:complete', (evt) => {
if (attempt.id === evt.attemptId) {
done(lightning.getResult(attempt.id));
} else {
Expand All @@ -42,7 +56,7 @@ const run = async (attempt) => {
});
};

test('echo initial state', async (t) => {
test.serial('echo initial state', async (t) => {
const initialState = { data: { count: 22 } };

lightning.addDataclip('s1', initialState);
Expand All @@ -52,7 +66,7 @@ test('echo initial state', async (t) => {
dataclip_id: 's1',
});

const result = await run(attempt);
const result = await run(t, attempt);

t.deepEqual(result, {
data: {
Expand All @@ -61,7 +75,7 @@ test('echo initial state', async (t) => {
});
});

test('start from a trigger node', async (t) => {
test.serial('start from a trigger node', async (t) => {
let runStartEvent;
let runCompleteEvent;

Expand All @@ -84,7 +98,7 @@ test('start from a trigger node', async (t) => {
runCompleteEvent = evt.payload;
});

await run(attempt);
await run(t, attempt);

t.truthy(runStartEvent);
t.is(runStartEvent.job_id, job.id);
Expand All @@ -103,7 +117,7 @@ test('start from a trigger node', async (t) => {
// hmm this event feels a bit fine-grained for this
// This file should just be about input-output
// TODO maybe move it into integrations later
test('run parallel jobs', async (t) => {
test.serial('run parallel jobs', async (t) => {
const initialState = { data: { count: 22 } };

lightning.addDataclip('s1', initialState);
Expand Down Expand Up @@ -144,7 +158,7 @@ test('run parallel jobs', async (t) => {
outputJson[evt.payload.job_id] = JSON.parse(evt.payload.output_dataclip);
});

const result = await run(attempt);
await run(t, attempt);

t.deepEqual(outputJson[x.id].data, {
a: true,
Expand All @@ -169,3 +183,23 @@ test('run parallel jobs', async (t) => {
// },
// });
});

test('run a http adaptor job', async (t) => {
const job = createJob({
adaptor: '@openfn/[email protected]',
body: 'get("https://jsonplaceholder.typicode.com/todos/1");',
});
const attempt = createAttempt([], [job], []);
const result = await run(t, attempt);

t.truthy(result.response);
t.is(result.response.status, 200);
t.truthy(result.response.headers);

t.deepEqual(result.data, {
userId: 1,
id: 1,
title: 'delectus aut autem',
completed: false,
});
});
118 changes: 118 additions & 0 deletions integration-tests/worker/test/benchmark.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import test from 'ava';
import path from 'node:path';

import { createAttempt } from '../src/factories';
import { initLightning, initWorker } from '../src/init';
import { run, humanMb } from '../src/util';

let lightning;
let worker;

const maxConcurrency = 1;

test.before(async () => {
const lightningPort = 4322;

lightning = initLightning(lightningPort);

({ worker } = await initWorker(
lightningPort,
{
repoDir: path.resolve('tmp/repo/bench'),
maxWorkers: maxConcurrency,
},
{
// Keep the backoff nice and low so that we can claim attempts quickly
backoff: { min: 0.001, max: 0.1 },
maxWorkflows: maxConcurrency,
}
));

// trigger autoinstall
const bootstrap = createAttempt(
[],
[
{
body: 'fn((s) => s)',
adaptor: '@openfn/[email protected]',
},
],
[]
);

await run(lightning, bootstrap);
});

test.afterEach(async () => {
lightning.reset();
});

test.after(async () => {
lightning.destroy();
await worker.destroy();
});

// Skipping these in CI (for now at least)
test.serial.skip('run 100 attempts', async (t) => {
return new Promise((done, reject) => {
const attemptsTotal = 100;
let attemptsComplete = 0;

let jobMax = 0;
let sysMax = 0;

const start = Date.now();

for (let i = 0; i < attemptsTotal; i++) {
const attempt = createAttempt(
[],
[
{
body: `fn((s) => new Promise(resolve => {
// create an array and fill with random items
const items = []
while (items.length > 1e6) {
items.push(Math.randomInt * 1000)
}
// sort it and stringify
s.data = items.sort().join('-')
// wait before returning
setTimeout(() => resolve(s), 100)
}))`,
adaptor: '@openfn/[email protected]',
},
],
[]
);
lightning.enqueueAttempt(attempt);
}

lightning.on('run:complete', (evt) => {
// May want to disable this but it's nice feedback
//console.log('Completed ', evt.attemptId);

if (evt.payload.reason !== 'success') {
t.log('Atempt failed:');
t.log(evt.payload);
reject('Attempt failed!');
}

attemptsComplete++;

const { job, system } = evt.payload.mem;
jobMax = Math.max(job, jobMax);
sysMax = Math.max(system, sysMax);

if (attemptsComplete === attemptsTotal) {
t.log(`${attemptsComplete} attempts processed`);
t.log(`${maxConcurrency} concurrent workers`);
t.log(`duration: ${(Date.now() - start) / 1000}s`);
t.log(`max job memory: ${humanMb(jobMax)}mb`);
t.log(`max system memory: ${humanMb(sysMax)}mb`);
t.pass('done');
done();
}
});
});
});
24 changes: 24 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,27 @@ test('exception: autoinstall error', async (t) => {
/Error installing @openfn\/[email protected]/
);
});

test('kill: oom', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: `fn((s) => {
s.data = [];
while(true) {
s.data.push(new Array(1e5).fill("xyz"))
}
})`,
},
],
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'OOMError');
t.is(error_message, 'Run exceeded maximum memory usage');
});
33 changes: 0 additions & 33 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,39 +236,6 @@ test('run a job with initial state (no top level keys)', (t) => {
});
});

test('run a http adaptor job', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: 'get("https://jsonplaceholder.typicode.com/todos/1");',
},
],
};

lightning.once('attempt:complete', () => {
const result = lightning.getResult(attempt.id);

t.truthy(result.response);
t.is(result.response.status, 200);
t.truthy(result.response.headers);

t.deepEqual(result.data, {
userId: 1,
id: 1,
title: 'delectus aut autem',
completed: false,
});

done();
});

lightning.enqueueAttempt(attempt);
});
});

// TODO this sort of works but the server side of it does not
// Will work on it more
// TODO2: the runtime doesn't return config anymore (correctly!)
Expand Down
7 changes: 7 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 0.4.10

### Patch Changes

- Updated dependencies
- @openfn/runtime@0.2.1

## 0.4.9

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "0.4.9",
"version": "0.4.10",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
Loading

0 comments on commit cb13173

Please sign in to comment.