From a6533280d1787ebe7dfff71b922662b13e21f212 Mon Sep 17 00:00:00 2001 From: josephjclark Date: Tue, 4 Jun 2024 17:54:38 +0100 Subject: [PATCH] Engine: support multiple inputs to a step (#704) * runtime: support steps running multiple times downstream steps will be executed multiple times. Repeat steps given a -n suffix. multiple returns still supported * runtime: update tests * tests: update to support multiple inputs * release: worker@1.1.11 cli@1.3.2 --- integration-tests/cli/test/errors.test.ts | 15 -- integration-tests/worker/CHANGELOG.md | 8 + integration-tests/worker/package.json | 2 +- packages/cli/CHANGELOG.md | 8 + packages/cli/package.json | 2 +- packages/engine-multi/CHANGELOG.md | 7 + packages/engine-multi/package.json | 2 +- packages/lightning-mock/CHANGELOG.md | 8 + packages/lightning-mock/package.json | 2 +- packages/runtime/CHANGELOG.md | 6 + packages/runtime/package.json | 2 +- packages/runtime/src/execute/plan.ts | 46 ++-- packages/runtime/src/execute/step.ts | 1 - packages/runtime/src/util/validate-plan.ts | 14 -- packages/runtime/test/execute/plan.test.ts | 197 +++++++++++++++++- packages/runtime/test/runtime.test.ts | 53 +++++ .../runtime/test/util/validate-plan.test.ts | 7 +- packages/ws-worker/CHANGELOG.md | 9 + packages/ws-worker/package.json | 2 +- 19 files changed, 323 insertions(+), 68 deletions(-) diff --git a/integration-tests/cli/test/errors.test.ts b/integration-tests/cli/test/errors.test.ts index 003517ed5..c9742dd47 100644 --- a/integration-tests/cli/test/errors.test.ts +++ b/integration-tests/cli/test/errors.test.ts @@ -106,21 +106,6 @@ test.serial('circular workflow', async (t) => { t.regex(error.message[0].message, /circular dependency: b <-> a/i); }); -test.serial('multiple inputs', async (t) => { - const { stdout, err } = await run( - `openfn ${jobsPath}/multiple-inputs.json --log-json` - ); - t.is(err.code, 1); - - const stdlogs = extractLogs(stdout); - - assertLog(t, stdlogs, /Error validating execution plan/i); - assertLog(t, stdlogs, /Workflow failed/i); - - const error = stdlogs.find((l) => l.message[0].name === 'ValidationError'); - t.regex(error.message[0].message, /multiple dependencies detected for: c/i); -}); - test.serial('invalid start on workflow (not found)', async (t) => { const { stdout, err } = await run( `openfn ${jobsPath}/invalid-start.json --log-json` diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index e243ec677..31a72f051 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/integration-tests-worker +## 1.0.46 + +### Patch Changes + +- @openfn/engine-multi@1.1.9 +- @openfn/lightning-mock@2.0.9 +- @openfn/ws-worker@1.1.11 + ## 1.0.45 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index db427dfa5..fe6f38239 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.45", + "version": "1.0.46", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index a2ee42896..63802f148 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/cli +## 1.3.2 + +### Patch Changes + +- Enable a step to have multiple inputs +- Updated dependencies + - @openfn/runtime@1.2.0 + ## 1.3.1 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 238095ac7..9dd8511cb 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.3.1", + "version": "1.3.2", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index f67d1773a..6882f822f 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,12 @@ # engine-multi +## 1.1.9 + +### Patch Changes + +- Updated dependencies + - @openfn/runtime@1.2.0 + ## 1.1.8 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 871e0fb09..83f2ff386 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.1.8", + "version": "1.1.9", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index fd5b2b434..0b0b77181 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.0.9 + +### Patch Changes + +- Updated dependencies + - @openfn/runtime@1.2.0 + - @openfn/engine-multi@1.1.9 + ## 2.0.8 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 35b70ddf9..127dcad9e 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.8", + "version": "2.0.9", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 9fd9efbd0..5b65f2737 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.2.0 + +### Minor Changes + +- Enable a step to have multiple inputs + ## 1.1.3 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index a5edb25ac..23d699104 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.1.3", + "version": "1.2.0", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/plan.ts b/packages/runtime/src/execute/plan.ts index d87940714..839c94cdc 100644 --- a/packages/runtime/src/execute/plan.ts +++ b/packages/runtime/src/execute/plan.ts @@ -30,8 +30,6 @@ const executePlan = async ( const { workflow, options } = compiledPlan; - let queue: string[] = [options.start]; - const ctx = { plan: compiledPlan, opts, @@ -40,10 +38,7 @@ const executePlan = async ( notify: opts.callbacks?.notify ?? (() => {}), }; - // record of state returned by every job - const stateHistory: Record = {}; - - // Record of state on lead nodes (nodes with no next) + // Record of state on leaf nodes (nodes with no next) const leaves: Record = {}; if (typeof input === 'string') { @@ -56,19 +51,36 @@ const executePlan = async ( opts.callbacks?.notify?.(NOTIFY_STATE_LOAD, { duration, jobId: id }); logger.success(`loaded state for ${id} in ${duration}ms`); } + + const queue: Array<{ stepName: string; input: any }> = [ + { stepName: options.start, input }, + ]; + + // count how many times each step has been called + const counts: Record = {}; + // Right now this executes in series, even if jobs are parallelised while (queue.length) { - const next = queue.shift()!; - const job = workflow.steps[next]; + const { stepName, input: prevState } = queue.shift()!; - const prevState = stateHistory[job.previous || ''] ?? input; + const step = workflow.steps[stepName]; - const result = await executeStep(ctx, job, prevState); - stateHistory[next] = result.state; + if (isNaN(counts[stepName])) { + counts[stepName] = 0; + } else { + counts[stepName] += 1; + } + + // create a unique step id + // leave the first step as just the step name to preserve legacy stuff + const stepId = + counts[stepName] === 0 ? stepName : `${step.id}-${counts[stepName]}`; - const exitEarly = options.end === next; - if (exitEarly || !result.next.length) { - leaves[next] = stateHistory[next]; + const result = await executeStep(ctx, step, prevState); + + const exitEarly = options.end === stepName; + if (result.state && (exitEarly || !result.next.length)) { + leaves[stepId] = result.state; } if (exitEarly) { @@ -77,9 +89,9 @@ const executePlan = async ( break; } - if (result.next) { - queue.push(...result.next); - } + result.next?.forEach((next) => { + queue.push({ stepName: next, input: result.state }); + }); } // If there are multiple leaf results, return them diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 39d303bac..de9d90112 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -90,7 +90,6 @@ const executeStep = async ( let result: any = input; let next: string[] = []; let didError = false; - if (step.expression) { const job = step as Job; const jobId = job.id!; diff --git a/packages/runtime/src/util/validate-plan.ts b/packages/runtime/src/util/validate-plan.ts index 737424d7b..f43eeb2e1 100644 --- a/packages/runtime/src/util/validate-plan.ts +++ b/packages/runtime/src/util/validate-plan.ts @@ -15,7 +15,6 @@ export default (plan: ExecutionPlan) => { const model = buildModel(plan); assertNoCircularReferences(model); - assertSingletonDependencies(model); return true; }; @@ -104,16 +103,3 @@ const assertNoCircularReferences = (model: Model) => { search(id, id, 'up'); // TODO do we even need to do this? } }; - -// This ensures that each step only has a single upstream edge, -// ie, each step only has a single input -// This is importand for the `--cache` functionality in the CLI, -// which assumes this rule when working out the input to a custom start node -const assertSingletonDependencies = (model: Model) => { - for (const id in model) { - const node = model[id]; - if (Object.keys(node.up).length > 1) { - throw new ValidationError(`Multiple dependencies detected for: ${id}`); - } - } -}; diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index c7657163f..9b7c485af 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -34,17 +34,6 @@ test('throw for a circular job', async (t) => { t.regex(e!.message, /circular dependency/i); }); -test('throw for a job with multiple inputs', async (t) => { - const plan = createPlan([ - createJob({ next: { job3: true } }), - createJob({ id: 'job2', next: { job3: true } }), - createJob({ id: 'job3' }), - ]); - - const e = await t.throwsAsync(() => executePlan(plan, {}, {}, mockLogger)); - t.regex(e!.message, /multiple dependencies/i); -}); - test('throw for a plan which references an undefined job', async (t) => { const plan = createPlan([createJob({ next: { job3: true } })]); @@ -351,6 +340,159 @@ test('all state is passed through successive jobs', async (t) => { }); }); +test('execute the same step twice', async (t) => { + let callCount = 0; + const notify = (evt: string, payload: any) => { + if (evt === 'job-start' && payload.jobId === 'x') { + callCount += 1; + } + }; + const plan = createPlan([ + createJob({ + id: 'start', + expression: 'export default [s => s]', + next: { a: true, b: true }, + }), + + createJob({ + id: 'a', + expression: 'export default [s => s]', + next: { x: true }, + }), + createJob({ + id: 'b', + expression: 'export default [s => s]', + next: { x: true }, + }), + + createJob({ + id: 'x', + expression: 'export default [s => s]', + }), + ]); + + await executePlan(plan, {}, { callbacks: { notify } }, mockLogger); + + t.is(callCount, 2); +}); + +test('A step executing twice should have two different inputs', async (t) => { + const plan = createPlan([ + createJob({ + id: 'start', + expression: 'export default [s => s]', + next: { a: true, b: true }, + }), + + createJob({ + id: 'a', + expression: 'export default [s => ({ ...s, a: true })]', + next: { x: true }, + }), + createJob({ + id: 'b', + expression: 'export default [s => ({ ...s, b: true })]', + next: { x: true }, + }), + + // x should receive two distinct and unique state objects + // This will throw if any state is shared between a and b + createJob({ + id: 'x', + expression: + 'export default [s => { if (s.a && s.b) throw new Error("SHARED STATE") }]', + }), + ]); + + await executePlan(plan, {}, {}, mockLogger); + + t.pass('state not shared'); +}); + +test('Return multiple results for leaf step that executes multiple times', async (t) => { + const plan = createPlan([ + createJob({ + id: 'start', + expression: 'export default [s => s]', + next: { a: true, b: true }, + }), + + createJob({ + id: 'a', + expression: 'export default [s => ({ ...s, a: true })]', + next: { x: true }, + }), + createJob({ + id: 'b', + expression: 'export default [s => ({ ...s, b: true })]', + next: { x: true }, + }), + + createJob({ + id: 'x', + expression: 'export default [s => Object.keys(s)]', + }), + ]); + + const result = await executePlan(plan, {}, {}, mockLogger); + t.deepEqual(result, { + x: ['data', 'a', 'configuration'], + 'x-1': ['data', 'b', 'configuration'], + }); +}); + +test('Downstream nodes get executed multiple times', async (t) => { + let callCount = 0; + const notify = (evt: string, payload: any) => { + if (evt === 'job-start' && payload.jobId === 'y') { + callCount += 1; + } + }; + + const plan = createPlan([ + createJob({ + id: 'start', + expression: 'export default [s => s]', + next: { a: true, b: true }, + }), + + createJob({ + id: 'a', + expression: 'export default [s => ({ ...s, a: true })]', + next: { x: true }, + }), + createJob({ + id: 'b', + expression: 'export default [s => ({ ...s, b: true })]', + next: { x: true }, + }), + + createJob({ + id: 'x', + expression: 'export default [s => ({ ...s, x: true })]', + next: { y: true }, + }), + + // This should be called twice with different inputs + createJob({ + id: 'y', + expression: 'export default [s => Object.keys(s)]', + }), + ]); + + const result = await executePlan( + plan, + {}, + { callbacks: { notify } }, + mockLogger + ); + t.is(callCount, 2); + t.deepEqual(result, { + y: ['data', 'a', 'x', 'configuration'], + 'y-1': ['data', 'b', 'x', 'configuration'], + }); +}); + test('execute edge based on state in the condition', async (t) => { const plan = createPlan([ { @@ -541,6 +683,39 @@ test('execute multiple steps in "parallel"', async (t) => { }); }); +test('ignore leaf nodes with no result', async (t) => { + const plan = createPlan( + [ + { + id: 'start', + expression: 'export default [s => s]', + next: { + a: true, + b: true, + c: true, + }, + }, + { + id: 'a', + expression: 'export default [s => { s.data.x += 1; return s; } ]', + }, + { + id: 'b', + expression: 'export default [s => null ]', + }, + { + id: 'c', + expression: 'export default [s => null ]', + }, + ], + { start: 'start' } + ); + const state = { data: { x: 0 } }; + + const result: any = await executePlan(plan, state, {}, mockLogger); + t.deepEqual(result, { data: { x: 1 } }); +}); + test('isolate state in "parallel" execution', async (t) => { const plan = createPlan( [ diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 485619401..904fc0db6 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -238,6 +238,59 @@ test('run a workflow with state and parallel branching', async (t) => { }); }); +test('run a workflow with a leaf step called multiple times', async (t) => { + const plan: ExecutionPlanNoOptions = { + workflow: { + steps: [ + { + expression: + 'export default [(s) => { s.data.count += 1; s.data.a = true; return s}]', + next: { + b: true as const, + c: true as const, + }, + }, + { + id: 'b', + expression: + 'export default [(s) => { s.data.count += 1; s.data.b = true; return s}]', + next: { z: true }, + }, + { + id: 'c', + expression: + 'export default [(s) => { s.data.count += 1; s.data.c = true; return s}]', + next: { z: true }, + }, + { + id: 'z', + expression: 'export default [(s) => s]', + }, + ], + }, + }; + + const state = { data: { count: 0 } }; + + const result: any = await run(plan, state); + t.deepEqual(result, { + z: { + data: { + count: 2, + a: true, + b: true, + }, + }, + 'z-1': { + data: { + count: 2, + a: true, + c: true, + }, + }, + }); +}); + // TODO this test sort of shows why input state on the plan object is a bit funky // running the same plan with two inputs is pretty clunky test('run a workflow with state and conditional branching', async (t) => { diff --git a/packages/runtime/test/util/validate-plan.test.ts b/packages/runtime/test/util/validate-plan.test.ts index 1f0858d06..829ded9bf 100644 --- a/packages/runtime/test/util/validate-plan.test.ts +++ b/packages/runtime/test/util/validate-plan.test.ts @@ -86,7 +86,7 @@ test('throws for an indirect circular dependency', (t) => { }); }); -test('throws for a multiple inputs', (t) => { +test('allows for a multiple inputs', (t) => { const plan: ExecutionPlan = { options: {}, workflow: { @@ -99,9 +99,8 @@ test('throws for a multiple inputs', (t) => { }, }; - t.throws(() => validate(plan), { - message: 'Multiple dependencies detected for: z', - }); + validate(plan); + t.pass(); }); test('throws for a an unknown job', (t) => { diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 33aa04db5..d94c55a39 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,14 @@ # ws-worker +## 1.1.11 + +### Patch Changes + +- Enable a step to have multiple inputs +- Updated dependencies + - @openfn/runtime@1.2.0 + - @openfn/engine-multi@1.1.9 + ## 1.1.10 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 9c9fa6f33..3062b05a8 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.1.10", + "version": "1.1.11", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",