Skip to content

Commit

Permalink
Engine: support multiple inputs to a step (#704)
Browse files Browse the repository at this point in the history
* runtime: support steps running multiple times

downstream steps will be executed multiple times. Repeat steps given a -n suffix. multiple returns still supported

* runtime: update tests

* tests: update to support multiple inputs

* release: [email protected] [email protected]
  • Loading branch information
josephjclark authored Jun 4, 2024
1 parent 07050f7 commit a653328
Show file tree
Hide file tree
Showing 19 changed files with 323 additions and 68 deletions.
15 changes: 0 additions & 15 deletions integration-tests/cli/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,6 @@ test.serial('circular workflow', async (t) => {
t.regex(error.message[0].message, /circular dependency: b <-> a/i);
});

test.serial('multiple inputs', async (t) => {
const { stdout, err } = await run(
`openfn ${jobsPath}/multiple-inputs.json --log-json`
);
t.is(err.code, 1);

const stdlogs = extractLogs(stdout);

assertLog(t, stdlogs, /Error validating execution plan/i);
assertLog(t, stdlogs, /Workflow failed/i);

const error = stdlogs.find((l) => l.message[0].name === 'ValidationError');
t.regex(error.message[0].message, /multiple dependencies detected for: c/i);
});

test.serial('invalid start on workflow (not found)', async (t) => {
const { stdout, err } = await run(
`openfn ${jobsPath}/invalid-start.json --log-json`
Expand Down
8 changes: 8 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/integration-tests-worker

## 1.0.46

### Patch Changes

- @openfn/engine-multi@1.1.9
- @openfn/lightning-mock@2.0.9
- @openfn/ws-worker@1.1.11

## 1.0.45

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.45",
"version": "1.0.46",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/cli

## 1.3.2

### Patch Changes

- Enable a step to have multiple inputs
- Updated dependencies
- @openfn/runtime@1.2.0

## 1.3.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.3.1",
"version": "1.3.2",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 1.1.9

### Patch Changes

- Updated dependencies
- @openfn/runtime@1.2.0

## 1.1.8

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.1.8",
"version": "1.1.9",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
8 changes: 8 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/lightning-mock

## 2.0.9

### Patch Changes

- Updated dependencies
- @openfn/runtime@1.2.0
- @openfn/engine-multi@1.1.9

## 2.0.8

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.0.8",
"version": "2.0.9",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/runtime

## 1.2.0

### Minor Changes

- Enable a step to have multiple inputs

## 1.1.3

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/runtime",
"version": "1.1.3",
"version": "1.2.0",
"description": "Job processing runtime.",
"type": "module",
"exports": {
Expand Down
46 changes: 29 additions & 17 deletions packages/runtime/src/execute/plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ const executePlan = async (

const { workflow, options } = compiledPlan;

let queue: string[] = [options.start];

const ctx = {
plan: compiledPlan,
opts,
Expand All @@ -40,10 +38,7 @@ const executePlan = async (
notify: opts.callbacks?.notify ?? (() => {}),
};

// record of state returned by every job
const stateHistory: Record<string, State> = {};

// Record of state on lead nodes (nodes with no next)
// Record of state on leaf nodes (nodes with no next)
const leaves: Record<string, State> = {};

if (typeof input === 'string') {
Expand All @@ -56,19 +51,36 @@ const executePlan = async (
opts.callbacks?.notify?.(NOTIFY_STATE_LOAD, { duration, jobId: id });
logger.success(`loaded state for ${id} in ${duration}ms`);
}

const queue: Array<{ stepName: string; input: any }> = [
{ stepName: options.start, input },
];

// count how many times each step has been called
const counts: Record<string, number> = {};

// Right now this executes in series, even if jobs are parallelised
while (queue.length) {
const next = queue.shift()!;
const job = workflow.steps[next];
const { stepName, input: prevState } = queue.shift()!;

const prevState = stateHistory[job.previous || ''] ?? input;
const step = workflow.steps[stepName];

const result = await executeStep(ctx, job, prevState);
stateHistory[next] = result.state;
if (isNaN(counts[stepName])) {
counts[stepName] = 0;
} else {
counts[stepName] += 1;
}

// create a unique step id
// leave the first step as just the step name to preserve legacy stuff
const stepId =
counts[stepName] === 0 ? stepName : `${step.id}-${counts[stepName]}`;

const exitEarly = options.end === next;
if (exitEarly || !result.next.length) {
leaves[next] = stateHistory[next];
const result = await executeStep(ctx, step, prevState);

const exitEarly = options.end === stepName;
if (result.state && (exitEarly || !result.next.length)) {
leaves[stepId] = result.state;
}

if (exitEarly) {
Expand All @@ -77,9 +89,9 @@ const executePlan = async (
break;
}

if (result.next) {
queue.push(...result.next);
}
result.next?.forEach((next) => {
queue.push({ stepName: next, input: result.state });
});
}

// If there are multiple leaf results, return them
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ const executeStep = async (
let result: any = input;
let next: string[] = [];
let didError = false;

if (step.expression) {
const job = step as Job;
const jobId = job.id!;
Expand Down
14 changes: 0 additions & 14 deletions packages/runtime/src/util/validate-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export default (plan: ExecutionPlan) => {

const model = buildModel(plan);
assertNoCircularReferences(model);
assertSingletonDependencies(model);

return true;
};
Expand Down Expand Up @@ -104,16 +103,3 @@ const assertNoCircularReferences = (model: Model) => {
search(id, id, 'up'); // TODO do we even need to do this?
}
};

// This ensures that each step only has a single upstream edge,
// ie, each step only has a single input
// This is importand for the `--cache` functionality in the CLI,
// which assumes this rule when working out the input to a custom start node
const assertSingletonDependencies = (model: Model) => {
for (const id in model) {
const node = model[id];
if (Object.keys(node.up).length > 1) {
throw new ValidationError(`Multiple dependencies detected for: ${id}`);
}
}
};
Loading

0 comments on commit a653328

Please sign in to comment.