Skip to content

Commit

Permalink
Merge pull request #484 from OpenFn/parallel-state
Browse files Browse the repository at this point in the history
Fix input_data_clip id
  • Loading branch information
josephjclark authored Nov 15, 2023
2 parents a03ab3e + ea41bd7 commit d52143b
Show file tree
Hide file tree
Showing 40 changed files with 1,540 additions and 242 deletions.
10 changes: 10 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-worker

## 1.0.16

### Patch Changes

- Updated dependencies [c8e9d51]
- Updated dependencies [0fb2d58]
- @openfn/engine-multi@0.1.10
- @openfn/ws-worker@0.2.6
- @openfn/lightning-mock@1.0.11

## 1.0.15

### Patch Changes
Expand Down
7 changes: 4 additions & 3 deletions integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.15",
"version": "1.0.16",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
"type": "module",
"scripts": {
"clean": "rimraf dist tmp/repo/*",
"clean": "rimraf dist tmp/repo/**",
"build:pack": "pnpm clean && cd ../.. && pnpm pack:local integration-tests/worker/dist --no-version",
"build": "pnpm build:pack && docker build --tag worker-integration-tests .",
"start": "docker run worker-integration-tests",
"test": "pnpm clean && npx ava -s --timeout 2m"
"test": "pnpm clean && npx ava -s --timeout 2m",
"test:cache": "npx ava -s --timeout 2m"
},
"dependencies": {
"@openfn/engine-multi": "workspace:^",
Expand Down
35 changes: 35 additions & 0 deletions integration-tests/worker/src/factories.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import crypto from 'node:crypto';

export const createAttempt = (triggers, jobs, edges, args) => ({
id: crypto.randomUUID(),
triggers,
jobs,
edges,
...args,
});

export const createJob = (args) => ({
id: crypto.randomUUID(),
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => s)',
...args,
});

export const createTrigger = () => ({
id: crypto.randomUUID(),
});

export const createEdge = (a: any, b: any, condition?: string) => {
const edge: any = {
id: crypto.randomUUID(),
target_job_id: b.id,
};
if (!a.body) {
edge.source_trigger_id = a.id;
} else {
edge.source_job_id = a.id;
}
return edge;
};

export default createAttempt;
2 changes: 1 addition & 1 deletion integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import crypto from 'node:crypto';
import createLightningServer from '@openfn/lightning-mock';
import createEngine from '@openfn/engine-multi';
import createWorkerServer from '@openfn/ws-worker';
import { createMockLogger } from '@openfn/logger';
import createLogger, { createMockLogger } from '@openfn/logger';

export const randomPort = () => parseInt(2000 + Math.random() * 1000);

Expand Down
171 changes: 171 additions & 0 deletions integration-tests/worker/test/attempts.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import test from 'ava';
import path from 'node:path';

import {
createAttempt,
createEdge,
createJob,
createTrigger,
} from '../src/factories';
import { initLightning, initWorker } from '../src/init';

let lightning;
let worker;

test.before(async () => {
const lightningPort = 4321;

lightning = initLightning(lightningPort);

({ worker } = await initWorker(lightningPort, {
repoDir: path.resolve('tmp/repo/attempts'),
}));
});

test.after(async () => {
lightning.destroy();
await worker.destroy();
});

const run = async (attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.once('attempt:complete', (evt) => {
if (attempt.id === evt.attemptId) {
done(lightning.getResult(attempt.id));
} else {
// If we get here, something has gone very wrong
reject('attempt not found');
}
});

lightning.enqueueAttempt(attempt);
});
};

test('echo initial state', async (t) => {
const initialState = { data: { count: 22 } };

lightning.addDataclip('s1', initialState);

const job = createJob({ body: 'fn((s) => s)' });
const attempt = createAttempt([], [job], [], {
dataclip_id: 's1',
});

const result = await run(attempt);

t.deepEqual(result, {
data: {
count: 22,
},
});
});

test('start from a trigger node', async (t) => {
let runStartEvent;
let runCompleteEvent;

const initialState = { data: { count: 22 } };

lightning.addDataclip('s1', initialState);

const trigger = createTrigger();
const job = createJob({ body: 'fn((s) => s)' });
const edge = createEdge(trigger, job);
const attempt = createAttempt([trigger], [job], [edge], {
dataclip_id: 's1',
});

lightning.once('run:start', (evt) => {
runStartEvent = evt.payload;
});

lightning.once('run:complete', (evt) => {
runCompleteEvent = evt.payload;
});

await run(attempt);

t.truthy(runStartEvent);
t.is(runStartEvent.job_id, job.id);
t.truthy(runStartEvent.run_id);
t.is(runStartEvent.input_dataclip_id, 's1');

t.truthy(runCompleteEvent);
t.is(runCompleteEvent.reason, 'success');
t.is(runCompleteEvent.error_message, null);
t.is(runCompleteEvent.error_type, null);
t.is(runCompleteEvent.job_id, job.id);
t.truthy(runCompleteEvent.output_dataclip_id);
t.is(runCompleteEvent.output_dataclip, JSON.stringify(initialState));
});

// hmm this event feels a bit fine-grained for this
// This file should just be about input-output
// TODO maybe move it into integrations later
test('run parallel jobs', async (t) => {
const initialState = { data: { count: 22 } };

lightning.addDataclip('s1', initialState);

/*
[a]
/ \
[x] [y]
*/
const a = createJob({ body: 'fn((s) => ({ data: { a: true }}))' });
const x = createJob({ body: 'fn((s) => { s.data.x = true; return s; })' });
const y = createJob({ body: 'fn((s) => { s.data.y = true; return s; })' });
const ax = createEdge(a, x);
const ay = createEdge(a, y);
const jobs = [a, x, y];
const edges = [ax, ay];

const attempt = createAttempt([], jobs, edges, {
dataclip_id: 's1',
});

// This saves JSON returned by a job
const outputJson = {};

// This saves the dataclip returned by a job
const outputId = {};

lightning.on('run:start', (evt) => {
// x and y should both be passed the dataclip produced by job a
if (evt.payload.run_id === x.id || evt.payload.run_id === y.id) {
evt.payload.input_dataclip_id = outputId[a.id];
}
});

lightning.on('run:complete', (evt) => {
// save the output dataclip
outputJson[evt.payload.job_id] = evt.payload.output_dataclip_id;
outputJson[evt.payload.job_id] = JSON.parse(evt.payload.output_dataclip);
});

const result = await run(attempt);

t.deepEqual(outputJson[x.id].data, {
a: true,
x: true,
// Should not include a write from y
});
t.deepEqual(outputJson[y.id].data, {
a: true,
y: true,
// Should not include a write from x
});

// I think the result should look like this - but it won't without work
// t.deepEqual(result, {
// [x.id]: {
// a: true,
// x: true,
// },
// [y.id]: {
// a: true,
// y: true,
// },
// });
});
4 changes: 2 additions & 2 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ test.before(async () => {
lightning = initLightning(lightningPort);

({ worker } = await initWorker(lightningPort, {
repoDir: path.resolve('tmp/openfn/repo/exit-reason'),
repoDir: path.resolve('tmp/repo/exit-reason'),
}));
});

Expand All @@ -24,7 +24,7 @@ test.after(async () => {

const run = async (attempt) => {
return new Promise<any>(async (done) => {
lightning.on('attempt:complete', (evt) => {
lightning.once('attempt:complete', (evt) => {
if (attempt.id === evt.attemptId) {
done(evt.payload);
}
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ test.before(async () => {
({ worker, engine } = await initWorker(lightningPort, {
maxWorkers: 1,
purge: false,
repoDir: path.resolve('tmp/openfn/repo/integration'),
repoDir: path.resolve('tmp/repo/integration'),
}));
});

Expand Down
7 changes: 7 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 0.4.7

### Patch Changes

- Updated dependencies [7f352d2]
- @openfn/runtime@0.1.3

## 0.4.6

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "0.4.6",
"version": "0.4.7",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
8 changes: 8 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# engine-multi

## 0.1.10

### Patch Changes

- c8e9d51: Forward next from job complete
- Updated dependencies [7f352d2]
- @openfn/runtime@0.1.3

## 0.1.9

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "0.1.9",
"version": "0.1.10",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
8 changes: 5 additions & 3 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const workflowComplete = (
const { workflowId, state: result, threadId } = event;

logger.success('complete workflow ', workflowId);
logger.info(state);
//logger.info(event.state);

// TODO I don't know how we'd get here in this architecture
// if (!allWorkflows.has(workflowId)) {
Expand Down Expand Up @@ -84,13 +84,14 @@ export const jobComplete = (
context: ExecutionContext,
event: internalEvents.JobCompleteEvent
) => {
const { threadId, state, duration, jobId } = event;
const { threadId, state, duration, jobId, next } = event;

context.emit(externalEvents.JOB_COMPLETE, {
threadId,
state,
duration,
jobId,
next,
});
};

Expand All @@ -100,14 +101,15 @@ export const jobError = (
context: ExecutionContext,
event: internalEvents.JobErrorEvent
) => {
const { threadId, state, error, duration, jobId } = event;
const { threadId, state, error, duration, jobId, next } = event;

context.emit(externalEvents.JOB_ERROR, {
threadId,
state,
error,
duration,
jobId,
next,
});
};

Expand Down
2 changes: 2 additions & 0 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ export interface JobCompletePayload extends ExternalEvent {
jobId: string;
duration: number;
state: any; // the result state
next: string[]; // downstream jobs
}

export interface JobErrorPayload extends ExternalEvent {
jobId: string;
duration: number;
state: any; // the result state
error: any;
next: string[]; // downstream jobs
}

export interface WorkerLogPayload extends ExternalEvent, JSONLog {}
Expand Down
2 changes: 2 additions & 0 deletions packages/engine-multi/src/worker/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ export interface JobCompleteEvent extends InternalEvent {
jobId: string;
state: any;
duration: number;
next: string[];
}

export interface JobErrorEvent extends InternalEvent {
jobId: string;
state: any;
error: any; // TODO this should be one of our errors
duration: number;
next: string[];
}

export interface LogEvent extends InternalEvent {
Expand Down
Loading

0 comments on commit d52143b

Please sign in to comment.