Skip to content

Commit

Permalink
Merge pull request #431 from OpenFn/release/worker-next
Browse files Browse the repository at this point in the history
Release/worker next
  • Loading branch information
josephjclark authored Nov 3, 2023
2 parents ea10ad4 + b7f230c commit 89124fd
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 219 deletions.
5 changes: 0 additions & 5 deletions .changeset/ninety-moose-build.md

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ jobs:
- run: pnpm config set "//registry.npmjs.org/:_authToken=${NPM_TOKEN}"
env:
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
- run: pnpm publish -r --report-summary --publish-branch main --access=public
- run: pnpm changeset tag
- run: git push --tags
- run: pnpm publish -r --report-summary --publish-branch main --access=public
- run: pnpm run generate-slack-report
env:
SLACK_TOKEN: ${{ secrets.SLACK_TOKEN }}
10 changes: 10 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-worker

## 1.0.3

### Patch Changes

- Updated dependencies [d255f32]
- Updated dependencies [f241348]
- @openfn/engine-multi@0.1.2
- @openfn/ws-worker@0.1.2
- @openfn/lightning-mock@1.0.3

## 1.0.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.2",
"version": "1.0.3",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
6 changes: 2 additions & 4 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ let lightning;
let worker;
let engine;

test.afterEach(() => {
test.afterEach(async () => {
lightning.destroy();
worker.destroy();
await worker.destroy();
});

const initLightning = () => {
Expand Down Expand Up @@ -380,7 +380,6 @@ test('stateful adaptor should create a new client for each attempt', (t) => {
let results = {};

initLightning();

lightning.on('attempt:complete', (evt) => {
const id = evt.attemptId;
results[id] = lightning.getResult(id);
Expand All @@ -392,7 +391,6 @@ test('stateful adaptor should create a new client for each attempt', (t) => {
t.is(one.threadId, two.threadId);

t.not(one.clientId, two.clientId);

done();
}
});
Expand Down
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 0.1.2

### Patch Changes

- d255f32: Defer execution to allow listeners to attach
- f241348: Make destroy async

## 0.1.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "0.1.1",
"version": "0.1.2",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
9 changes: 8 additions & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ export default function initWorkers(
};

// This will force termination instantly
api.closeWorkers = () => workers.terminate(true);
api.closeWorkers = () => {
workers.terminate(true);

// Defer the return to allow workerpool to close down
return new Promise((done) => {
setTimeout(done, 20);
});
};
}

export function createWorkers(workerPath: string, options: WorkerOptions) {
Expand Down
16 changes: 10 additions & 6 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,15 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
delete deferredListeners[workflowId];
}

// TODO typing between the class and interface isn't right
// @ts-ignore
execute(context);
// execute(context);

// Run the execute on a timeout so that consumers have a chance
// to register listeners
setTimeout(() => {
// TODO typing between the class and interface isn't right
// @ts-ignore
execute(context);
}, 1);

// hmm. Am I happy to pass the internal workflow state OUT of the handler?
// I'd rather have like a proxy emitter or something
Expand Down Expand Up @@ -218,9 +224,7 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
// How does this work if deferred?
};

const destroy = () => {
engine.closeWorkers()
}
const destroy = () => engine.closeWorkers();

return Object.assign(engine, {
options,
Expand Down
20 changes: 0 additions & 20 deletions packages/engine-multi/src/test/slow-random.js

This file was deleted.

141 changes: 78 additions & 63 deletions packages/engine-multi/test/api/call-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ test.before(() => {

test.after(() => api.closeWorkers());

test('initWorkers should add a callWorker function', (t) => {
test.serial('initWorkers should add a callWorker function', (t) => {
t.assert(typeof api.callWorker === 'function');
});

test('callWorker should return the default result', async (t) => {
test.serial('callWorker should return the default result', async (t) => {
const result = await api.callWorker('test', []);
t.is(result, 42);
});

test('callWorker should return a custom result', async (t) => {
test.serial('callWorker should return a custom result', async (t) => {
const result = await api.callWorker('test', [84]);
t.is(result, 84);
});

test('callWorker should trigger an event callback', async (t) => {
test.serial('callWorker should trigger an event callback', async (t) => {
return new Promise((done) => {
const onCallback = ({ result }) => {
t.is(result, 11);
Expand All @@ -42,11 +42,14 @@ test('callWorker should trigger an event callback', async (t) => {
});
});

test('callWorker should throw TimeoutError if it times out', async (t) => {
await t.throwsAsync(() => api.callWorker('timeout', [11], {}, 10), {
instanceOf: WorkerPoolPromise.TimeoutError,
});
});
test.serial(
'callWorker should throw TimeoutError if it times out',
async (t) => {
await t.throwsAsync(() => api.callWorker('timeout', [11], {}, 10), {
instanceOf: WorkerPoolPromise.TimeoutError,
});
}
);

// Dang, this doesn't work, the worker threads run in the same process
test.skip('callWorker should execute with a different process id', async (t) => {
Expand All @@ -60,7 +63,7 @@ test.skip('callWorker should execute with a different process id', async (t) =>
});
});

test('callWorker should execute in a different process', async (t) => {
test.serial('callWorker should execute in a different process', async (t) => {
return new Promise((done) => {
// @ts-ignore
process.scribble = 'xyz';
Expand All @@ -75,7 +78,7 @@ test('callWorker should execute in a different process', async (t) => {
});
});

test('callWorker should try to purge workers on complete', async (t) => {
test.serial('callWorker should try to purge workers on complete', async (t) => {
return new Promise((done) => {
api.on(PURGE, () => {
t.pass('purge event called');
Expand All @@ -86,77 +89,89 @@ test('callWorker should try to purge workers on complete', async (t) => {
});
});

test('If null env is passed, worker thread should be able to access parent env', async (t) => {
const badAPI = {} as EngineAPI;
const env = null;
initWorkers(badAPI, workerPath, { env });
test.serial(
'If null env is passed, worker thread should be able to access parent env',
async (t) => {
const badAPI = {} as EngineAPI;
const env = null;
initWorkers(badAPI, workerPath, { env });

// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;
// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;

// try and read that key inside the thread
const result = await badAPI.callWorker('readEnv', ['TEST']);
// try and read that key inside the thread
const result = await badAPI.callWorker('readEnv', ['TEST']);

// voila, the kingdom is yours
t.is(result, code);
// voila, the kingdom is yours
t.is(result, code);

badAPI.closeWorkers();
});
badAPI.closeWorkers();
}
);

test('By default, worker thread cannot access parent env if env not set (no options arg)', async (t) => {
const defaultAPI = {} as EngineAPI;
test.serial(
'By default, worker thread cannot access parent env if env not set (no options arg)',
async (t) => {
const defaultAPI = {} as EngineAPI;

initWorkers(defaultAPI, workerPath /* no options passed*/);
initWorkers(defaultAPI, workerPath /* no options passed*/);

// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;
// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;

// try and read that key inside the thread
const result = await defaultAPI.callWorker('readEnv', ['TEST']);
// try and read that key inside the thread
const result = await defaultAPI.callWorker('readEnv', ['TEST']);

// No fish
t.is(result, undefined);
// No fish
t.is(result, undefined);

defaultAPI.closeWorkers();
});
defaultAPI.closeWorkers();
}
);

test('By default, worker thread cannot access parent env if env not set (with options arg)', async (t) => {
const defaultAPI = {} as EngineAPI;
test.serial(
'By default, worker thread cannot access parent env if env not set (with options arg)',
async (t) => {
const defaultAPI = {} as EngineAPI;

initWorkers(defaultAPI, workerPath, { maxWorkers: 1 });
initWorkers(defaultAPI, workerPath, { maxWorkers: 1 });

// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;
// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;

// try and read that key inside the thread
const result = await defaultAPI.callWorker('readEnv', ['TEST']);
// try and read that key inside the thread
const result = await defaultAPI.callWorker('readEnv', ['TEST']);

// No fish
t.is(result, undefined);
// No fish
t.is(result, undefined);

defaultAPI.closeWorkers();
});
defaultAPI.closeWorkers();
}
);

test('Worker thread cannot access parent env if custom env is passted', async (t) => {
const customAPI = {} as EngineAPI;
const env = { NODE_ENV: 'production' };
initWorkers(customAPI, workerPath, { env });
test.serial(
'Worker thread cannot access parent env if custom env is passted',
async (t) => {
const customAPI = {} as EngineAPI;
const env = { NODE_ENV: 'production' };
initWorkers(customAPI, workerPath, { env });

// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;
// Set up a special key on process.env
const code = '76ytghjs';
process.env.TEST = code;

// try and read that key inside the thread
const result = await customAPI.callWorker('readEnv', ['TEST']);
// try and read that key inside the thread
const result = await customAPI.callWorker('readEnv', ['TEST']);

// No fish
t.is(result, undefined);
// No fish
t.is(result, undefined);

const result2 = await customAPI.callWorker('readEnv', ['NODE_ENV']);
t.is(result2, 'production');
const result2 = await customAPI.callWorker('readEnv', ['NODE_ENV']);
t.is(result2, 'production');

customAPI.closeWorkers();
});
customAPI.closeWorkers();
}
);
Loading

0 comments on commit 89124fd

Please sign in to comment.