Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: Update version listings #611

Merged
merged 22 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/calm-books-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': minor
'@openfn/runtime': minor
---

Support workflows with different versions of the same adaptor
5 changes: 5 additions & 0 deletions .changeset/fair-bobcats-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': patch
---

Record adaptor versions as an array
5 changes: 5 additions & 0 deletions .changeset/yellow-clouds-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Move version log to workflow start
38 changes: 37 additions & 1 deletion integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);
const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('step:complete', ({ payload }) => {
t.is(payload.reason, 'success');

// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
Expand Down Expand Up @@ -192,7 +194,7 @@ test.serial('run parallel jobs', async (t) => {
// });
});

test('run a http adaptor job', async (t) => {
test.serial('run a http adaptor job', async (t) => {
const job = createJob({
adaptor: '@openfn/[email protected]',
body: `get("https://jsonplaceholder.typicode.com/todos/1");
Expand All @@ -212,3 +214,37 @@ test('run a http adaptor job', async (t) => {
completed: false,
});
});

test.serial('use different versions of the same adaptor', async (t) => {
// http@5 exported an axios global - so run this job and validate that the global is there
const job1 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (!axios) {
throw new Error('AXIOS NOT FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// http@6 no longer exports axios - so throw an error if we see it
const job2 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (axios) {
throw new Error('AXIOS FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// Just for fun, run each job a couple of times to make sure that there's no wierd caching or ordering anything
const steps = [job1, job2, job1, job2];
const attempt = createRun([], steps, []);

const result = await run(t, attempt);
t.log(result);
t.falsy(result.errors);
});
27 changes: 20 additions & 7 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}

if (!skipRepoValidation && !didValidateRepo) {
// TODO what if this throws?
// Whole server probably needs to crash, so throwing is probably appropriate
// TODO do we need to do it on EVERY call? Can we not cache it?
await ensureRepo(repoDir, logger);
didValidateRepo = true;
Expand All @@ -137,12 +135,15 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {

const v = version || 'unknown';

// Write the adaptor version to the context
// This is a reasonably accurate, but not totally bulletproof, report
// @ts-ignore
context.versions[name] = v;
// Write the adaptor version to the context for reporting later
if (!context.versions[name]) {
context.versions[name] = [];
}
if (!context.versions[name].includes(v)) {
(context.versions[name] as string[]).push(v);
}

paths[name] = {
paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
version: v,
};
Expand All @@ -152,6 +153,18 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}
}

// Write linker arguments back to the plan
for (const step of plan.workflow.steps) {
const job = step as unknown as Job;
if (paths[job.adaptor!]) {
const { name } = getNameAndVersion(job.adaptor!);
// @ts-ignore
job.linker = {
[name]: paths[job.adaptor!],
};
}
}

if (adaptorsToLoad.length) {
// Add this to the queue
const p = enqueue(adaptorsToLoad);
Expand Down
12 changes: 6 additions & 6 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import {
} from './lifecycle';
import preloadCredentials from './preload-credentials';
import { ExecutionError } from '../errors';
import type { RunOptions } from '../worker/thread/run';

const execute = async (context: ExecutionContext) => {
const { state, callWorker, logger, options } = context;
try {
// TODO catch and "throw" nice clean autoinstall errors
const adaptorPaths = await autoinstall(context);
await autoinstall(context);

// TODO catch and "throw" nice clean compile errors
try {
await compile(context);
} catch (e: any) {
Expand All @@ -49,10 +48,9 @@ const execute = async (context: ExecutionContext) => {
const whitelist = options.whitelist?.map((w) => w.toString());

const runOptions = {
adaptorPaths,
whitelist,
statePropsToRemove: options.statePropsToRemove,
};
whitelist,
} as RunOptions;

const workerOptions = {
memoryLimitMb: options.memoryLimitMb,
Expand Down Expand Up @@ -109,13 +107,15 @@ const execute = async (context: ExecutionContext) => {
jobError(context, evt);
},
[workerEvents.LOG]: (evt: workerEvents.LogEvent) => {
// console.log(evt.log.name, evt.log.message);
log(context, evt);
},
// TODO this is also untested
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
error(context, { workflowId: state.plan.id, error: evt.error });
},
};

return callWorker(
'run',
[state.plan, state.input || {}, runOptions || {}],
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const workflowStart = (
// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
});
};

Expand Down Expand Up @@ -81,7 +82,6 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
versions: context.versions,
});
};

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/classes/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
import type { ExternalEvents, EventMap } from '../events';

/**
* The ExeuctionContext class wraps an event emitter with some useful context
* The ExecutionContext class wraps an event emitter with some useful context
* and automatically appends the workflow id to each emitted events
*
* Each running workflow has its own context object
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface ExternalEvent {
workflowId: string;
}

export interface WorkflowStartPayload extends ExternalEvent {}
export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
Expand All @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
versions: Versions;
}

export interface JobCompletePayload extends ExternalEvent {
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ export type Versions = {
node: string;
engine: string;
compiler: string;
[adaptor: string]: string;

[adaptor: string]: string | string[];
};
9 changes: 4 additions & 5 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { execute, createLoggers } from './helpers';
import serializeError from '../../util/serialize-error';
import { JobErrorPayload } from '../../events';

type RunOptions = {
adaptorPaths: Record<string, { path: string }>;
export type RunOptions = {
repoDir: string;
whitelist?: RegExp[];
sanitize: SanitizePolicies;
statePropsToRemove?: string[];
Expand All @@ -26,8 +26,7 @@ const eventMap = {

register({
run: (plan: ExecutionPlan, input: State, runOptions: RunOptions) => {
const { adaptorPaths, whitelist, sanitize, statePropsToRemove } =
runOptions;
const { repoDir, whitelist, sanitize, statePropsToRemove } = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
sanitize,
Expand All @@ -52,7 +51,7 @@ register({
logger,
jobLogger,
linker: {
modules: adaptorPaths,
repo: repoDir,
whitelist,
cacheKey: plan.id,
},
Expand Down
56 changes: 50 additions & 6 deletions packages/engine-multi/test/api/autoinstall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ test('Autoinstall basically works', async (t) => {
const context = createContext(autoinstallOpts);

const paths = await autoinstall(context);
t.log(paths);
t.deepEqual(paths, {
'@openfn/language-common': {
'@openfn/language-common@1.0.0': {
path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0',
version: '1.0.0',
},
Expand Down Expand Up @@ -263,15 +264,15 @@ test('autoinstall: handle two seperate, non-overlapping installs', async (t) =>

const p1 = await autoinstall(c1);
t.deepEqual(p1, {
'@openfn/language-dhis2': {
'@openfn/language-dhis2@1.0.0': {
path: 'tmp/repo/node_modules/@openfn/language-dhis2_1.0.0',
version: '1.0.0',
},
});

const p2 = await autoinstall(c2);
t.deepEqual(p2, {
'@openfn/language-http': {
'@openfn/language-http@1.0.0': {
path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0',
version: '1.0.0',
},
Expand Down Expand Up @@ -329,10 +330,53 @@ test.serial('autoinstall: return a map to modules', async (t) => {
const result = await autoinstall(context);

t.deepEqual(result, {
'@openfn/[email protected]': {
path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0',
version: '1.0.0',
},
'@openfn/[email protected]': {
path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0',
version: '1.0.0',
},
});
});

test.serial('autoinstall: write linker options back to the plan', async (t) => {
const jobs = [
{
adaptor: '@openfn/[email protected]',
},
{
adaptor: '@openfn/[email protected]',
},
{
adaptor: '@openfn/[email protected]',
},
];

const autoinstallOpts = {
skipRepoValidation: true,
handleInstall: async () => {},
handleIsInstalled: async () => false,
};
const context = createContext(autoinstallOpts, jobs);

await autoinstall(context);

const [a, b, c] = context.state.plan.workflow.steps as Job[];
t.deepEqual(a.linker, {
'@openfn/language-common': {
path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0',
version: '1.0.0',
},
});
t.deepEqual(b.linker, {
'@openfn/language-common': {
path: 'tmp/repo/node_modules/@openfn/language-common_2.0.0',
version: '2.0.0',
},
});
t.deepEqual(c.linker, {
'@openfn/language-http': {
path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0',
version: '1.0.0',
Expand Down Expand Up @@ -363,7 +407,7 @@ test.serial('autoinstall: support custom whitelist', async (t) => {
const result = await autoinstall(context);

t.deepEqual(result, {
y: {
'[email protected]': {
path: 'tmp/repo/node_modules/y_1.0.0',
version: '1.0.0',
},
Expand Down Expand Up @@ -521,7 +565,7 @@ test('write versions to context', async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});

test("write versions to context even if we don't install", async (t) => {
Expand All @@ -534,5 +578,5 @@ test("write versions to context even if we don't install", async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});
7 changes: 3 additions & 4 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
};

context.on(e.WORKFLOW_START, (evt) => {
t.deepEqual(evt, {
workflowId,
threadId: '123',
});
t.truthy(evt.versions);
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '123');
done();
});

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ test.serial('trigger workflow-start', (t) => {
api.execute(plan, emptyState).on('workflow-start', (evt) => {
t.is(evt.workflowId, plan.id);
t.truthy(evt.threadId);
t.truthy(evt.versions);
t.pass('workflow started');
done();
});
Expand All @@ -77,7 +78,6 @@ test.serial('trigger job-start', (t) => {
t.is(e.workflowId, '2');
t.is(e.jobId, 'j1');
t.truthy(e.threadId);
t.truthy(e.versions);
t.pass('job started');
done();
});
Expand Down
Loading