Skip to content

Commit

Permalink
Merge pull request #491 from OpenFn/fix-purge
Browse files Browse the repository at this point in the history
Maybe fix purge?
  • Loading branch information
josephjclark authored Nov 16, 2023
2 parents d09e8be + 793d523 commit b7e0ffa
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/five-seals-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-engine': patch
---

Updated purge strategy
26 changes: 12 additions & 14 deletions packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type WorkerOptions = {

// Adds a `callWorker` function to the API object, which will execute a task in a worker
export default function initWorkers(
api: EngineAPI,
engine: EngineAPI,
workerPath: string,
options: WorkerOptions = {},
logger?: Logger
) {
// TODO can we verify the worker path and throw if it's invalid?
// workerpool won't complain if we give it a nonsense path
const workers = createWorkers(workerPath, options);
api.callWorker = (
engine.callWorker = (
task: string,
args: any[] = [],
events: any = {},
Expand All @@ -48,22 +48,20 @@ export default function initWorkers(
promise.timeout(timeout);
}

if (options.purge) {
promise.then(() => {
const { pendingTasks } = workers.stats();
if (pendingTasks == 0) {
logger?.debug('Purging workers');
api.emit(PURGE);
workers.terminate();
}
});
}

return promise;
};

engine.purge = () => {
const { pendingTasks } = workers.stats();
if (pendingTasks == 0) {
logger?.debug('Purging workers');
engine.emit(PURGE);
workers.terminate();
}
};

// This will force termination instantly
api.closeWorkers = () => {
engine.closeWorkers = () => {
workers.terminate(true);

// Defer the return to allow workerpool to close down
Expand Down
15 changes: 6 additions & 9 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
const contexts: Record<string, ExecutionContext> = {};
const deferredListeners: Record<string, Record<string, EventHandler>[]> = {};

// TODO I think this is for later
//const activeWorkflows: string[] = [];

// TOOD I wonder if the engine should a) always accept a worker path
// and b) validate it before it runs
let resolvedWorkerPath;
if (workerPath) {
// If a path to the worker has been passed in, just use it verbatim
Expand Down Expand Up @@ -183,14 +178,17 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
delete deferredListeners[workflowId];
}

// execute(context);

// Run the execute on a timeout so that consumers have a chance
// to register listeners
setTimeout(() => {
// TODO typing between the class and interface isn't right
// @ts-ignore
execute(context);
execute(context).finally(() => {
delete contexts[workflowId];
if (options.purge && Object.keys(contexts).length === 0) {
engine.purge?.();
}
});
}, 1);

// hmm. Am I happy to pass the internal workflow state OUT of the handler?
Expand All @@ -202,7 +200,6 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
context.once(evt, fn),
off: (evt: string, fn: (...args: any[]) => void) => context.off(evt, fn),
};
// return context;
};

const listen = (
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface ExecutionContext extends EventEmitter {
export interface EngineAPI extends EventEmitter {
callWorker: CallWorker;
closeWorkers: () => void;
purge?: () => void;
}

export interface RuntimeEngine extends EventEmitter {
Expand Down
13 changes: 1 addition & 12 deletions packages/engine-multi/test/api/call-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let api = new EventEmitter() as EngineAPI;
const workerPath = path.resolve('src/test/worker-functions.js');

test.before(() => {
initWorkers(api, workerPath, { purge: true });
initWorkers(api, workerPath);
});

test.after(() => api.closeWorkers());
Expand Down Expand Up @@ -78,17 +78,6 @@ test.serial('callWorker should execute in a different process', async (t) => {
});
});

test.serial('callWorker should try to purge workers on complete', async (t) => {
return new Promise((done) => {
api.on(PURGE, () => {
t.pass('purge event called');
done();
});

api.callWorker('test', []);
});
});

test.serial(
'If null env is passed, worker thread should be able to access parent env',
async (t) => {
Expand Down
82 changes: 81 additions & 1 deletion packages/engine-multi/test/engine.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import test from 'ava';
import path from 'node:path';
import { createMockLogger } from '@openfn/logger';
import { createPlan } from '../src/test/util';

import createEngine from '../src/engine';
import * as e from '../src/events';
Expand All @@ -22,6 +21,7 @@ const options = {
autoinstall: {
handleIsInstalled: async () => true,
},
purge: true,
};

let engine;
Expand Down Expand Up @@ -239,3 +239,83 @@ test.serial('timeout the whole attempt and emit an error', async (t) => {
engine.execute(plan, opts);
});
});

test.serial('Purge workers when a run is complete', async (t) => {
return new Promise(async (done) => {
const p = path.resolve('src/test/worker-functions.js');
engine = await createEngine(options, p);

const plan = {
id: 'a',
jobs: [
{
expression: '34',
},
],
};

engine.on(e.PURGE, () => {
t.pass('purge event called');
done();
});

engine.execute(plan);
});
});

test.serial('Purge workers when run errors', async (t) => {
return new Promise(async (done) => {
const p = path.resolve('src/test/worker-functions.js');
engine = await createEngine(options, p);

const plan = {
id: 'a',
jobs: [
{
expression: 'throw new Error("test")',
},
],
};

engine.on(e.PURGE, () => {
t.pass('purge event called');
done();
});

engine.execute(plan);
});
});

test.serial("Don't purge if purge is false", async (t) => {
return new Promise(async (done) => {
const p = path.resolve('src/test/worker-functions.js');
engine = await createEngine(
{
...options,
purge: false,
},
p
);

const plan = {
id: 'a',
jobs: [
{
expression: '34',
},
],
};

engine.on(e.PURGE, () => {
t.fail('purge event called');
done();
});

engine.execute(plan).on(e.WORKFLOW_COMPLETE, () => {
setTimeout(() => {
t.pass('no purge called within 50ms');
done();
}, 50);
});
});
});
2 changes: 1 addition & 1 deletion packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ test.serial('trigger job-complete', (t) => {

api.execute(plan).on('job-complete', (evt) => {
t.deepEqual(evt.next, []);
t.true(evt.duration < 10);
t.true(evt.duration < 20);
t.is(evt.jobId, 'j1');
t.deepEqual(evt.state, { data: {} });
t.pass('job completed');
Expand Down

0 comments on commit b7e0ffa

Please sign in to comment.