Skip to content

Commit

Permalink
Merge pull request #510 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Next Release
  • Loading branch information
josephjclark authored Nov 24, 2023
2 parents 8778c01 + 2033c26 commit d9fc425
Show file tree
Hide file tree
Showing 40 changed files with 919 additions and 158 deletions.
13 changes: 13 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# @openfn/integration-tests-worker

## 1.0.21

### Patch Changes

- Updated dependencies [30da946]
- Updated dependencies [c1aa9b3]
- Updated dependencies [5fdd699]
- Updated dependencies [60b6fba]
- Updated dependencies [a6dd44b]
- @openfn/ws-worker@0.2.10
- @openfn/engine-multi@0.2.1
- @openfn/lightning-mock@1.1.3

## 1.0.20

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.20",
"version": "1.0.21",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
9 changes: 6 additions & 3 deletions integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ export const initLightning = (port = 4000) => {
export const initWorker = async (lightningPort, engineArgs = {}) => {
const workerPort = randomPort();

const engineLogger = createMockLogger('engine', {
level: 'debug',
json: true,
});
const engine = await createEngine({
// logger: createLogger('engine', { level: 'debug' }),
logger: createMockLogger(),
logger: engineLogger,
repoDir: path.resolve('./tmp/repo/default'),
...engineArgs,
});
Expand All @@ -32,5 +35,5 @@ export const initWorker = async (lightningPort, engineArgs = {}) => {
secret: crypto.randomUUID(),
});

return { engine, worker };
return { engine, engineLogger, worker };
};
40 changes: 39 additions & 1 deletion integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@ import { initLightning, initWorker, randomPort } from '../src/init';
let lightning;
let worker;
let engine;
let engineLogger;
let lightningPort;

test.before(async () => {
lightningPort = randomPort();
lightning = initLightning(lightningPort);
({ worker, engine } = await initWorker(lightningPort, {
({ worker, engine, engineLogger } = await initWorker(lightningPort, {
maxWorkers: 1,
purge: false,
repoDir: path.resolve('tmp/repo/integration'),
}));
});

test.afterEach(() => {
engineLogger._reset();
});

test.after(async () => {
lightning.destroy();
await worker.destroy();
Expand Down Expand Up @@ -129,6 +134,39 @@ test('run a job which does NOT autoinstall common', (t) => {
});
});

test("Don't send job logs to stdout", (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => { console.log("@@@"); return s })',
},
],
};

lightning.once('attempt:complete', () => {
const jsonLogs = engineLogger._history.map((l) => JSON.parse(l));

// The engine logger shouldn't print out any job logs
const jobLog = jsonLogs.find((l) => l.name === 'JOB');
t.falsy(jobLog);
const jobLog2 = jsonLogs.find((l) => l.message[0] === '@@@');
t.falsy(jobLog2);

// But it SHOULD log engine stuff
const runtimeLog = jsonLogs.find(
(l) => l.name === 'R/T' && l.message[0].match(/completed job/i)
);
t.truthy(runtimeLog);
done();
});

lightning.enqueueAttempt(attempt);
});
});

test('run a job with initial state (with data)', (t) => {
return new Promise(async (done) => {
const attempt = {
Expand Down
121 changes: 114 additions & 7 deletions integration-tests/worker/test/server.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,70 @@
import test from 'ava';
import { fork } from 'node:child_process';

import { createAttempt, createJob } from '../src/factories';
import { initLightning, initWorker } from '../src/init';

let lightning;
let worker;
let workerProcess;

const spawnServer = (port: string | number = 1, args: string[] = []) => {
return new Promise((resolve) => {
const options = {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'] as any[],
};

// We use fork because we want IPC messaging with the processing
workerProcess = fork(
'./node_modules/@openfn/ws-worker/dist/start.js',
[
`-l ws://localhost:${port}/worker`,
'--backoff 0.001/0.01',
'--log debug',
'-s secretsquirrel',
...args,
],
options
);

workerProcess.on('message', (message) => {
if (message === 'READY') {
resolve(workerProcess);
}
});

// Uncomment for logs
// workerProcess.stdout.on('data', (data) => {
// console.log(data.toString());
// });
});
};

test.afterEach(async () => {
lightning.destroy();
await worker.destroy();
lightning?.destroy();
await workerProcess?.kill();
});

let portgen = 3000;

const getPort = () => ++portgen;

test('should connect to lightning', (t) => {
// note that lightning isnt available here, and this is fine
test.serial('worker should start, respond to 200, and close', async (t) => {
workerProcess = await spawnServer();

// The runnign server should respond to a get at root
let { status } = await fetch('http://localhost:2222/');
t.is(status, 200);

workerProcess.kill('SIGTERM');

// After being killed, the fetch should fail
await t.throwsAsync(() => fetch('http://localhost:2222/'), {
message: 'fetch failed',
});
});

test.serial('should connect to lightning', (t) => {
return new Promise(async (done) => {
const port = getPort();
lightning = initLightning(port);
Expand All @@ -24,11 +74,11 @@ test('should connect to lightning', (t) => {
done();
});

({ worker } = await initWorker(port));
workerProcess = await spawnServer(port);
});
});

test('should join attempts queue channel', (t) => {
test.serial('should join attempts queue channel', (t) => {
return new Promise(async (done) => {
const port = getPort();
lightning = initLightning(port);
Expand All @@ -40,6 +90,63 @@ test('should join attempts queue channel', (t) => {
}
});

({ worker } = await initWorker(port));
workerProcess = await spawnServer(port);
});
});

test.serial('allow a job to complete after receiving a sigterm', (t) => {
return new Promise(async (done) => {
let didKill = false;
const port = getPort();
lightning = initLightning(port);

const job = createJob({
// This job needs no adaptor (no autoinstall here!) and returns state after 1 second
adaptor: '',
body: 'export default [(s) => new Promise((resolve) => setTimeout(() => resolve(s), 1000))]',
});
const attempt = createAttempt([], [job], []);

lightning.once('attempt:complete', (evt) => {
t.true(didKill); // Did we kill the server before this returned?
t.is(evt.payload.reason, 'success'); // did the attempt succeed?
t.pass('ok');

// Give the server some time to shut down
setTimeout(async () => {
// The webserver should not respond
await t.throwsAsync(() => fetch('http://localhost:2222/'), {
message: 'fetch failed',
});

const finishTimeout = setTimeout(() => {
done();
}, 500);

// Lightning should receive no more claims
lightning.on('claim', () => {
clearTimeout(finishTimeout);
t.fail();
done();
});
}, 10);
});

workerProcess = await spawnServer(port);

lightning.enqueueAttempt(attempt);

// give the attempt time to start, then kill the server
setTimeout(() => {
didKill = true;
workerProcess.kill('SIGTERM');
}, 100);
});
});

test.serial('healthcheck', async (t) => {
workerProcess = await spawnServer();

let { status } = await fetch('http://localhost:2222/livez');
t.is(status, 200);
});
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 0.2.1

### Patch Changes

- 5fdd699: Don't direct job logs to stdout
- a6dd44b: Allow graceful termination of worker threads

## 0.2.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "0.2.0",
"version": "0.2.1",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
13 changes: 2 additions & 11 deletions packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ export default function initWorkers(
options: WorkerOptions = {},
logger?: Logger
) {
// TODO can we verify the worker path and throw if it's invalid?
// workerpool won't complain if we give it a nonsense path
const workers = createWorkers(workerPath, options);
engine.callWorker = (
task: string,
Expand Down Expand Up @@ -60,15 +58,8 @@ export default function initWorkers(
}
};

// This will force termination instantly
engine.closeWorkers = () => {
workers.terminate(true);

// Defer the return to allow workerpool to close down
return new Promise((done) => {
setTimeout(done, 20);
});
};
// This will force termination (with grace period if allowed)
engine.closeWorkers = async (instant?: boolean) => workers.terminate(instant);
}

export function createWorkers(workerPath: string, options: WorkerOptions) {
Expand Down
14 changes: 4 additions & 10 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,10 @@ export const log = (
event: internalEvents.LogEvent
) => {
const { threadId } = event;
// // TODO not sure about this stuff, I think we can drop it?
// const newMessage = {
// ...message,
// // Prefix the job id in all local jobs
// // I'm sure there are nicer, more elegant ways of doing this
// message: [`[${workflowId}]`, ...message.message],
// };
// TODO: if these are logs from within the runtime,
// should we use context.runtimeLogger ?
context.logger.proxy(event.message);

if (event.message.name !== 'JOB') {
context.logger.proxy(event.message);
}

context.emit(externalEvents.WORKFLOW_LOG, {
threadId,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
// How does this work if deferred?
};

const destroy = () => engine.closeWorkers();
const destroy = (instant?: boolean) => engine.closeWorkers(instant);

return Object.assign(engine, {
options,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export interface ExecutionContext extends EventEmitter {

export interface EngineAPI extends EventEmitter {
callWorker: CallWorker;
closeWorkers: () => void;
closeWorkers: (instant?: boolean) => void;
purge?: () => void;
}

Expand Down
11 changes: 10 additions & 1 deletion packages/engine-multi/test/api/autoinstall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ const mockHandleInstall = async (specifier: string): Promise<void> =>

const logger = createMockLogger();

const wait = (duration = 10) =>
new Promise((resolve) => {
setTimeout(resolve, duration);
});

const createContext = (
autoinstallOpts?,
jobs?: any[],
Expand Down Expand Up @@ -205,7 +210,11 @@ test.serial('autoinstall: install in sequence', async (t) => {
const c2 = createContext(options, [{ adaptor: '@openfn/language-common@2' }]);
const c3 = createContext(options, [{ adaptor: '@openfn/language-common@3' }]);

await Promise.all([autoinstall(c1), autoinstall(c2), autoinstall(c3)]);
autoinstall(c1);
await wait(1);
autoinstall(c2);
await wait(1);
await autoinstall(c3);

const s1 = states['@openfn/language-common@1'];
const s2 = states['@openfn/language-common@2'];
Expand Down
Loading

0 comments on commit d9fc425

Please sign in to comment.