Skip to content

Commit

Permalink
runtime: allow each job in a workflow to use a different adaptor version
Browse files Browse the repository at this point in the history
Basically instead of looking up the adaptor version from the global list of options, we calculate the version for each step and pass that through to execution
  • Loading branch information
josephjclark committed Feb 22, 2024
1 parent d5a740b commit 3cacfbe
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 6 deletions.
7 changes: 7 additions & 0 deletions packages/runtime/src/execute/compile-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
import compileFunction from '../modules/compile-function';
import { conditionContext, Context } from './context';
import { ExecutionPlan, Job, StepEdge, Workflow } from '@openfn/lexicon';
import { getNameAndVersion } from '../modules/repo';

const compileEdges = (
from: string,
Expand Down Expand Up @@ -127,6 +128,12 @@ export default (plan: ExecutionPlan) => {
'name',
]);

if ((step as Job).adaptor) {
const job = step as Job;
const { name, version } = getNameAndVersion(job.adaptor!);
newStep.linker = { [name]: { version: version! } };
}

if (step.next) {
trapErrors(() => {
newStep.next = compileEdges(stepId, step.next!, context);
Expand Down
14 changes: 11 additions & 3 deletions packages/runtime/src/execute/expression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
assertSecurityKill,
} from '../errors';
import type { JobModule, ExecutionContext } from '../types';
import { ModuleInfoMap } from '../modules/linker';

export type ExecutionErrorWrapper = {
state: any;
Expand All @@ -28,7 +29,10 @@ export type ExecutionErrorWrapper = {
export default (
ctx: ExecutionContext,
expression: string | Operation[],
input: State
input: State,
// allow custom linker options to be passed for this step
// this lets us use multiple versions of the same adaptor in a workflow
moduleOverrides?: ModuleInfoMap
) =>
new Promise(async (resolve, reject) => {
let duration = Date.now();
Expand All @@ -42,7 +46,8 @@ export default (
const { operations, execute } = await prepareJob(
expression,
context,
opts
opts,
moduleOverrides
);
// Create the main reducer function
const reducer = (execute || defaultExecute)(
Expand Down Expand Up @@ -125,11 +130,14 @@ export const wrapOperation = (
const prepareJob = async (
expression: string | Operation[],
context: Context,
opts: Options = {}
opts: Options = {},
moduleOverrides: ModuleInfoMap = {}
): Promise<JobModule> => {
if (typeof expression === 'string') {
const exports = await loadModule(expression, {
...opts.linker,
// allow module paths and versions to be overriden from the defaults
modules: Object.assign({}, opts.linker?.modules, moduleOverrides),
context,
log: opts.logger,
});
Expand Down
6 changes: 5 additions & 1 deletion packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,17 @@ const executeStep = async (

const timerId = `step-${jobId}`;
logger.timer(timerId);

// TODO can we include the adaptor version here?
// How would we get it?
logger.info(`Starting step ${jobName}`);

const startTime = Date.now();
try {
// TODO include the upstream job?
notify(NOTIFY_JOB_START, { jobId });
result = await executeExpression(ctx, job.expression, state);

result = await executeExpression(ctx, job.expression, state, step.linker);
} catch (e: any) {
didError = true;
if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) {
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/src/modules/module-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export default async (
opts: Options = {}
): Promise<ModuleExports> => {
validate(src);

const context = opts.context || vm.createContext();
const linker = opts.linker || mainLinker;

Expand Down
5 changes: 5 additions & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
NOTIFY_INIT_START,
NOTIFY_STATE_LOAD,
} from './events';
import { ModuleInfoMap } from './modules/linker';

export type CompiledEdge =
| boolean
Expand All @@ -23,6 +24,10 @@ export type CompiledStep = Omit<Step, 'next'> & {
id: StepId;
next?: Record<StepId, CompiledEdge>;

// custom overrides for the linker
// This lets us set version or even path per job
linker?: ModuleInfoMap;

[other: string]: any;
};

Expand Down
25 changes: 25 additions & 0 deletions packages/runtime/test/execute/compile-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ test('should reset job ids for each call', (t) => {
t.is(second.workflow.steps['job-1'].expression, 'x');
});

test('should write adaptor versions', (t) => {
const plan = {
workflow: {
steps: [
{
id: 'x',
expression: '.',
adaptor: '[email protected]',
},
{
id: 'y',
expression: '.',
adaptor: '[email protected]',
},
],
},
options: {},
};

const { workflow } = compilePlan(plan);
const { x, y } = workflow.steps;
t.deepEqual(x.linker, { x: { version: '1.0' } });
t.deepEqual(y.linker, { y: { version: '1.0' } });
});

test('should set the start to steps[0]', (t) => {
const plan: ExecutionPlan = {
workflow: {
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/test/modules/module-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ test('load a module with an import', async (t) => {
t.assert(m.default === 20);
});

test('load a module with aribtrary exports', async (t) => {
test('load a module with aribitrary exports', async (t) => {
const src = 'export const x = 10; export const y = 20;';

const m = await loadModule(src);
Expand Down
100 changes: 100 additions & 0 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,106 @@ test('run from an adaptor', async (t) => {
t.deepEqual(result, { data: 22 });
});

test('run a workflow using the repo and load the default version', async (t) => {
const expression = `
import result from 'ultimate-answer';
export default [() => result];
`;
const plan = {
workflow: {
steps: [
{
id: 'a',
expression,
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
},
}
);

t.deepEqual(result, 43);
});

test('run a workflow using the repo using a specific version', async (t) => {
const expression = `
import result from 'ultimate-answer';
export default [() => result];
`;
const plan = {
workflow: {
steps: [
{
id: 'a',
expression,
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
modules: {
'ultimate-answer': { version: '1.0.0' },
},
},
}
);

t.deepEqual(result, 42);
});

test('run a workflow using the repo with multiple versions of the same adaptor', async (t) => {
const plan = {
workflow: {
steps: [
{
id: 'a',
expression: `import result from 'ultimate-answer';
export default [(s) => { s.data.a = result; return s;}];`,
adaptor: '[email protected]',
next: { b: true },
},
{
id: 'b',
expression: `import result from 'ultimate-answer';
export default [(s) => { s.data.b = result; return s;}];`,
adaptor: '[email protected]',
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
},
}
);

t.deepEqual(result, {
data: {
a: 42,
b: 43,
},
});
});

// https://github.com/OpenFn/kit/issues/520
test('run from an adaptor with error', async (t) => {
const expression = `
Expand Down

0 comments on commit 3cacfbe

Please sign in to comment.