From 4f5f1ddf458789c1086abb98272329e1ed7d5e2c Mon Sep 17 00:00:00 2001 From: josephjclark Date: Fri, 23 Feb 2024 17:44:30 +0000 Subject: [PATCH 1/3] Allow a workflow to support multiple adaptor versions (#610) * runtime: allow each job in a workflow to use a different adaptor version Basically instead of looking up the adaptor version from the global list of options, we calculate the version for each step and pass that through to execution * tests: add test of multiple versions * engine: prefer repoDir to adaptorPaths This lets the runtime use the repo to install the correct versions of adaptors. It might run a little slower though * runtime: allow linker options to be passed on each job * runtime: better mreging of linker options * engine: update autoinstall to write paths to the plan --- .changeset/calm-books-care.md | 6 + integration-tests/worker/test/runs.test.ts | 38 +- packages/engine-multi/src/api/autoinstall.ts | 17 +- packages/engine-multi/src/api/execute.ts | 12 +- .../engine-multi/src/worker/thread/run.ts | 9 +- .../engine-multi/test/api/autoinstall.test.ts | 52 ++- packages/lexicon/core.d.ts | 11 + packages/runtime/src/execute/compile-plan.ts | 10 + packages/runtime/src/execute/expression.ts | 28 +- packages/runtime/src/execute/step.ts | 6 +- packages/runtime/src/index.ts | 3 +- packages/runtime/src/modules/module-loader.ts | 1 - packages/runtime/src/types.ts | 5 + .../runtime/test/execute/compile-plan.test.ts | 49 +++ .../runtime/test/execute/expression.test.ts | 328 ++++++++++++------ .../test/modules/module-loader.test.ts | 2 +- packages/runtime/test/runtime.test.ts | 100 ++++++ 17 files changed, 549 insertions(+), 128 deletions(-) create mode 100644 .changeset/calm-books-care.md diff --git a/.changeset/calm-books-care.md b/.changeset/calm-books-care.md new file mode 100644 index 000000000..dafa1bd09 --- /dev/null +++ b/.changeset/calm-books-care.md @@ -0,0 +1,6 @@ +--- +'@openfn/engine-multi': minor +'@openfn/runtime': minor +--- + +Support workflows with different versions of the same adaptor diff --git a/integration-tests/worker/test/runs.test.ts b/integration-tests/worker/test/runs.test.ts index 6e9d75333..37caf94c8 100644 --- a/integration-tests/worker/test/runs.test.ts +++ b/integration-tests/worker/test/runs.test.ts @@ -44,6 +44,8 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024); const run = async (t, attempt) => { return new Promise(async (done, reject) => { lightning.on('step:complete', ({ payload }) => { + t.is(payload.reason, 'success'); + // TODO friendlier job names for this would be nice (rather than run ids) t.log( `run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb( @@ -192,7 +194,7 @@ test.serial('run parallel jobs', async (t) => { // }); }); -test('run a http adaptor job', async (t) => { +test.serial('run a http adaptor job', async (t) => { const job = createJob({ adaptor: '@openfn/language-http@5.0.4', body: `get("https://jsonplaceholder.typicode.com/todos/1"); @@ -212,3 +214,37 @@ test('run a http adaptor job', async (t) => { completed: false, }); }); + +test.serial('use different versions of the same adaptor', async (t) => { + // http@5 exported an axios global - so run this job and validate that the global is there + const job1 = createJob({ + body: `import { axios } from "@openfn/language-http"; + fn((s) => { + if (!axios) { + throw new Error('AXIOS NOT FOUND') + } + return s; + })`, + adaptor: '@openfn/language-http@5.0.4', + }); + + // http@6 no longer exports axios - so throw an error if we see it + const job2 = createJob({ + body: `import { axios } from "@openfn/language-http"; + fn((s) => { + if (axios) { + throw new Error('AXIOS FOUND') + } + return s; + })`, + adaptor: '@openfn/language-http@6.0.0', + }); + + // Just for fun, run each job a couple of times to make sure that there's no wierd caching or ordering anything + const steps = [job1, job2, job1, job2]; + const attempt = createRun([], steps, []); + + const result = await run(t, attempt); + t.log(result); + t.falsy(result.errors); +}); diff --git a/packages/engine-multi/src/api/autoinstall.ts b/packages/engine-multi/src/api/autoinstall.ts index a20113630..1eb87b961 100644 --- a/packages/engine-multi/src/api/autoinstall.ts +++ b/packages/engine-multi/src/api/autoinstall.ts @@ -112,8 +112,6 @@ const autoinstall = async (context: ExecutionContext): Promise => { } if (!skipRepoValidation && !didValidateRepo) { - // TODO what if this throws? - // Whole server probably needs to crash, so throwing is probably appropriate // TODO do we need to do it on EVERY call? Can we not cache it? await ensureRepo(repoDir, logger); didValidateRepo = true; @@ -140,9 +138,10 @@ const autoinstall = async (context: ExecutionContext): Promise => { // Write the adaptor version to the context // This is a reasonably accurate, but not totally bulletproof, report // @ts-ignore + // TODO need to remove this soon as it's basically lying context.versions[name] = v; - paths[name] = { + paths[a] = { path: `${repoDir}/node_modules/${alias}`, version: v, }; @@ -152,6 +151,18 @@ const autoinstall = async (context: ExecutionContext): Promise => { } } + // Write linker arguments back to the plan + for (const step of plan.workflow.steps) { + const job = step as unknown as Job; + if (paths[job.adaptor!]) { + const { name } = getNameAndVersion(job.adaptor!); + // @ts-ignore + job.linker = { + [name]: paths[job.adaptor!], + }; + } + } + if (adaptorsToLoad.length) { // Add this to the queue const p = enqueue(adaptorsToLoad); diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index eb52f7dea..f86c996ba 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -15,14 +15,13 @@ import { } from './lifecycle'; import preloadCredentials from './preload-credentials'; import { ExecutionError } from '../errors'; +import type { RunOptions } from '../worker/thread/run'; const execute = async (context: ExecutionContext) => { const { state, callWorker, logger, options } = context; try { - // TODO catch and "throw" nice clean autoinstall errors - const adaptorPaths = await autoinstall(context); + await autoinstall(context); - // TODO catch and "throw" nice clean compile errors try { await compile(context); } catch (e: any) { @@ -49,10 +48,9 @@ const execute = async (context: ExecutionContext) => { const whitelist = options.whitelist?.map((w) => w.toString()); const runOptions = { - adaptorPaths, - whitelist, statePropsToRemove: options.statePropsToRemove, - }; + whitelist, + } as RunOptions; const workerOptions = { memoryLimitMb: options.memoryLimitMb, @@ -109,6 +107,7 @@ const execute = async (context: ExecutionContext) => { jobError(context, evt); }, [workerEvents.LOG]: (evt: workerEvents.LogEvent) => { + // console.log(evt.log.name, evt.log.message); log(context, evt); }, // TODO this is also untested @@ -116,6 +115,7 @@ const execute = async (context: ExecutionContext) => { error(context, { workflowId: state.plan.id, error: evt.error }); }, }; + return callWorker( 'run', [state.plan, state.input || {}, runOptions || {}], diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index 9dd3585d4..6948f5931 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -10,8 +10,8 @@ import { execute, createLoggers } from './helpers'; import serializeError from '../../util/serialize-error'; import { JobErrorPayload } from '../../events'; -type RunOptions = { - adaptorPaths: Record; +export type RunOptions = { + repoDir: string; whitelist?: RegExp[]; sanitize: SanitizePolicies; statePropsToRemove?: string[]; @@ -26,8 +26,7 @@ const eventMap = { register({ run: (plan: ExecutionPlan, input: State, runOptions: RunOptions) => { - const { adaptorPaths, whitelist, sanitize, statePropsToRemove } = - runOptions; + const { repoDir, whitelist, sanitize, statePropsToRemove } = runOptions; const { logger, jobLogger, adaptorLogger } = createLoggers( plan.id!, sanitize, @@ -52,7 +51,7 @@ register({ logger, jobLogger, linker: { - modules: adaptorPaths, + repo: repoDir, whitelist, cacheKey: plan.id, }, diff --git a/packages/engine-multi/test/api/autoinstall.test.ts b/packages/engine-multi/test/api/autoinstall.test.ts index defd732bd..d1ee4b235 100644 --- a/packages/engine-multi/test/api/autoinstall.test.ts +++ b/packages/engine-multi/test/api/autoinstall.test.ts @@ -77,8 +77,9 @@ test('Autoinstall basically works', async (t) => { const context = createContext(autoinstallOpts); const paths = await autoinstall(context); + t.log(paths); t.deepEqual(paths, { - '@openfn/language-common': { + '@openfn/language-common@1.0.0': { path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', version: '1.0.0', }, @@ -263,7 +264,7 @@ test('autoinstall: handle two seperate, non-overlapping installs', async (t) => const p1 = await autoinstall(c1); t.deepEqual(p1, { - '@openfn/language-dhis2': { + '@openfn/language-dhis2@1.0.0': { path: 'tmp/repo/node_modules/@openfn/language-dhis2_1.0.0', version: '1.0.0', }, @@ -271,7 +272,7 @@ test('autoinstall: handle two seperate, non-overlapping installs', async (t) => const p2 = await autoinstall(c2); t.deepEqual(p2, { - '@openfn/language-http': { + '@openfn/language-http@1.0.0': { path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', version: '1.0.0', }, @@ -329,10 +330,53 @@ test.serial('autoinstall: return a map to modules', async (t) => { const result = await autoinstall(context); t.deepEqual(result, { + '@openfn/language-common@1.0.0': { + path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', + version: '1.0.0', + }, + '@openfn/language-http@1.0.0': { + path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', + version: '1.0.0', + }, + }); +}); + +test.serial('autoinstall: write linker options back to the plan', async (t) => { + const jobs = [ + { + adaptor: '@openfn/language-common@1.0.0', + }, + { + adaptor: '@openfn/language-common@2.0.0', + }, + { + adaptor: '@openfn/language-http@1.0.0', + }, + ]; + + const autoinstallOpts = { + skipRepoValidation: true, + handleInstall: async () => {}, + handleIsInstalled: async () => false, + }; + const context = createContext(autoinstallOpts, jobs); + + await autoinstall(context); + + const [a, b, c] = context.state.plan.workflow.steps as Job[]; + t.deepEqual(a.linker, { '@openfn/language-common': { path: 'tmp/repo/node_modules/@openfn/language-common_1.0.0', version: '1.0.0', }, + }); + t.deepEqual(b.linker, { + '@openfn/language-common': { + path: 'tmp/repo/node_modules/@openfn/language-common_2.0.0', + version: '2.0.0', + }, + }); + t.deepEqual(c.linker, { '@openfn/language-http': { path: 'tmp/repo/node_modules/@openfn/language-http_1.0.0', version: '1.0.0', @@ -363,7 +407,7 @@ test.serial('autoinstall: support custom whitelist', async (t) => { const result = await autoinstall(context); t.deepEqual(result, { - y: { + 'y@1.0.0': { path: 'tmp/repo/node_modules/y_1.0.0', version: '1.0.0', }, diff --git a/packages/lexicon/core.d.ts b/packages/lexicon/core.d.ts index ec21ec13a..08fae4f6a 100644 --- a/packages/lexicon/core.d.ts +++ b/packages/lexicon/core.d.ts @@ -31,6 +31,17 @@ export interface Job extends Step { expression: Expression; configuration?: object | string; state?: Omit | string; + + // Internal use only + // Allow module paths and versions to be overriden in the linker + // Maps to runtime.ModuleInfoMapo + linker?: Record< + string, + { + path?: string; + version?: string; + } + >; } /** diff --git a/packages/runtime/src/execute/compile-plan.ts b/packages/runtime/src/execute/compile-plan.ts index ccb692f0c..ca7c6b2cc 100644 --- a/packages/runtime/src/execute/compile-plan.ts +++ b/packages/runtime/src/execute/compile-plan.ts @@ -7,6 +7,7 @@ import type { import compileFunction from '../modules/compile-function'; import { conditionContext, Context } from './context'; import { ExecutionPlan, Job, StepEdge, Workflow } from '@openfn/lexicon'; +import { getNameAndVersion } from '../modules/repo'; const compileEdges = ( from: string, @@ -115,6 +116,7 @@ export default (plan: ExecutionPlan) => { }; for (const step of workflow.steps) { + const job = step as Job; const stepId = step.id!; const newStep: CompiledStep = { id: stepId, @@ -127,6 +129,14 @@ export default (plan: ExecutionPlan) => { 'name', ]); + if (job.linker) { + newStep.linker = job.linker; + } else if (job.adaptor) { + const job = step as Job; + const { name, version } = getNameAndVersion(job.adaptor!); + newStep.linker = { [name]: { version: version! } }; + } + if (step.next) { trapErrors(() => { newStep.next = compileEdges(stepId, step.next!, context); diff --git a/packages/runtime/src/execute/expression.ts b/packages/runtime/src/execute/expression.ts index f2f4bc20a..7b5c3e086 100644 --- a/packages/runtime/src/execute/expression.ts +++ b/packages/runtime/src/execute/expression.ts @@ -18,6 +18,7 @@ import { assertSecurityKill, } from '../errors'; import type { JobModule, ExecutionContext } from '../types'; +import { ModuleInfoMap } from '../modules/linker'; export type ExecutionErrorWrapper = { state: any; @@ -28,7 +29,10 @@ export type ExecutionErrorWrapper = { export default ( ctx: ExecutionContext, expression: string | Operation[], - input: State + input: State, + // allow custom linker options to be passed for this step + // this lets us use multiple versions of the same adaptor in a workflow + moduleOverrides?: ModuleInfoMap ) => new Promise(async (resolve, reject) => { let duration = Date.now(); @@ -42,7 +46,8 @@ export default ( const { operations, execute } = await prepareJob( expression, context, - opts + opts, + moduleOverrides ); // Create the main reducer function const reducer = (execute || defaultExecute)( @@ -122,14 +127,31 @@ export const wrapOperation = ( }; }; +export const mergeLinkerOptions = ( + options: ModuleInfoMap = {}, + overrides: ModuleInfoMap = {} +) => { + const opts: ModuleInfoMap = {}; + for (const specifier in options) { + opts[specifier] = options[specifier]; + } + for (const specifier in overrides) { + opts[specifier] = Object.assign({}, opts[specifier], overrides[specifier]); + } + return opts; +}; + const prepareJob = async ( expression: string | Operation[], context: Context, - opts: Options = {} + opts: Options = {}, + moduleOverrides: ModuleInfoMap = {} ): Promise => { if (typeof expression === 'string') { const exports = await loadModule(expression, { ...opts.linker, + // allow module paths and versions to be overriden from the defaults + modules: mergeLinkerOptions(opts.linker?.modules, moduleOverrides), context, log: opts.logger, }); diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 47ee18168..39d303bac 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -121,13 +121,17 @@ const executeStep = async ( const timerId = `step-${jobId}`; logger.timer(timerId); + + // TODO can we include the adaptor version here? + // How would we get it? logger.info(`Starting step ${jobName}`); const startTime = Date.now(); try { // TODO include the upstream job? notify(NOTIFY_JOB_START, { jobId }); - result = await executeExpression(ctx, job.expression, state); + + result = await executeExpression(ctx, job.expression, state, step.linker); } catch (e: any) { didError = true; if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) { diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index d092464ad..e47e08474 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -1,5 +1,6 @@ -import run from './runtime'; +import run, { Options } from './runtime'; export default run; +export type { Options }; import type { ModuleInfo, ModuleInfoMap } from './modules/linker'; export type { ModuleInfo, ModuleInfoMap }; diff --git a/packages/runtime/src/modules/module-loader.ts b/packages/runtime/src/modules/module-loader.ts index fa239b319..a55266ca5 100644 --- a/packages/runtime/src/modules/module-loader.ts +++ b/packages/runtime/src/modules/module-loader.ts @@ -31,7 +31,6 @@ export default async ( opts: Options = {} ): Promise => { validate(src); - const context = opts.context || vm.createContext(); const linker = opts.linker || mainLinker; diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index a869cc73a..41fc80b72 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -11,6 +11,7 @@ import { NOTIFY_INIT_START, NOTIFY_STATE_LOAD, } from './events'; +import { ModuleInfoMap } from './modules/linker'; export type CompiledEdge = | boolean @@ -23,6 +24,10 @@ export type CompiledStep = Omit & { id: StepId; next?: Record; + // custom overrides for the linker + // This lets us set version or even path per job + linker?: ModuleInfoMap; + [other: string]: any; }; diff --git a/packages/runtime/test/execute/compile-plan.test.ts b/packages/runtime/test/execute/compile-plan.test.ts index ec99bd574..809c2a664 100644 --- a/packages/runtime/test/execute/compile-plan.test.ts +++ b/packages/runtime/test/execute/compile-plan.test.ts @@ -201,6 +201,55 @@ test('should reset job ids for each call', (t) => { t.is(second.workflow.steps['job-1'].expression, 'x'); }); +test('should write adaptor versions', (t) => { + const plan = { + workflow: { + steps: [ + { + id: 'x', + expression: '.', + adaptor: 'x@1.0', + }, + { + id: 'y', + expression: '.', + adaptor: 'y@1.0', + }, + ], + }, + options: {}, + }; + + const { workflow } = compilePlan(plan); + const { x, y } = workflow.steps; + t.deepEqual(x.linker, { x: { version: '1.0' } }); + t.deepEqual(y.linker, { y: { version: '1.0' } }); +}); + +test('should preserve linker options', (t) => { + const plan = { + workflow: { + steps: [ + { + id: 'x', + expression: '.', + adaptor: 'x@1.0', + linker: { + x: { + path: 'a/b/c', + }, + }, + }, + ], + }, + options: {}, + }; + + const { workflow } = compilePlan(plan); + const { x } = workflow.steps; + t.deepEqual(x.linker, { x: { path: 'a/b/c' } }); +}); + test('should set the start to steps[0]', (t) => { const plan: ExecutionPlan = { workflow: { diff --git a/packages/runtime/test/execute/expression.test.ts b/packages/runtime/test/execute/expression.test.ts index 5b14567e4..c60cf5d66 100644 --- a/packages/runtime/test/execute/expression.test.ts +++ b/packages/runtime/test/execute/expression.test.ts @@ -3,7 +3,7 @@ import { fn } from '@openfn/language-common'; import { createMockLogger } from '@openfn/logger'; import type { Operation, State } from '@openfn/lexicon'; -import execute from '../../src/execute/expression'; +import execute, { mergeLinkerOptions } from '../../src/execute/expression'; import type { ExecutionContext } from '../../src/types'; type TestState = State & { @@ -40,7 +40,7 @@ test.afterEach(() => { // This is convenient in testing as it's easier to catch errors // Note that the linker and module loader do heavier testing of strings -test('run a live no-op job with one operation', async (t) => { +test.serial('run a live no-op job with one operation', async (t) => { const job = [(s: State) => s]; const state = createState(); const context = createContext(); @@ -50,7 +50,7 @@ test('run a live no-op job with one operation', async (t) => { t.deepEqual(state, result); }); -test('run a stringified no-op job with one operation', async (t) => { +test.serial('run a stringified no-op job with one operation', async (t) => { const job = 'export default [(s) => s]'; const state = createState(); const context = createContext(); @@ -60,17 +60,20 @@ test('run a stringified no-op job with one operation', async (t) => { t.deepEqual(state, result); }); -test('run a live no-op job with @openfn/language-common.fn', async (t) => { - const job = [fn((s) => s)]; - const state = createState(); - const context = createContext(); +test.serial( + 'run a live no-op job with @openfn/language-common.fn', + async (t) => { + const job = [fn((s) => s)]; + const state = createState(); + const context = createContext(); - const result = await execute(context, job, state); + const result = await execute(context, job, state); - t.deepEqual(state, result); -}); + t.deepEqual(state, result); + } +); -test('jobs can handle a promise', async (t) => { +test.serial('jobs can handle a promise', async (t) => { const job = [async (s: State) => s]; const state = createState(); const context = createContext(); @@ -80,7 +83,7 @@ test('jobs can handle a promise', async (t) => { t.deepEqual(state, result); }); -test('output state should be serializable', async (t) => { +test.serial('output state should be serializable', async (t) => { const job = [async (s: State) => s]; const circular = {}; @@ -101,57 +104,72 @@ test('output state should be serializable', async (t) => { t.falsy(result.data.fn); }); -test('configuration is removed from the result by default', async (t) => { - const job = [async (s: State) => s]; - const context = createContext(); - - const result = await execute(context, job, { configuration: {} }); - t.deepEqual(result, {}); -}); - -test('statePropsToRemove removes multiple props from state', async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x', 'y']; - const context = createContext({}, { statePropsToRemove }); - - const result = await execute(context, job, { x: 1, y: 1, z: 1 }); - t.deepEqual(result, { z: 1 }); -}); - -test('statePropsToRemove logs to debug when a prop is removed', async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x']; - - const context = createContext({}, { statePropsToRemove }); - - const result = await execute(context, job, { x: 1, y: 1, z: 1 }); - t.deepEqual(result, { y: 1, z: 1 }); - - const log = logger._find('debug', /removed x from final state/i); - t.truthy(log); -}); - -test('no props are removed from state if an empty array is passed to statePropsToRemove', async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x', 'y']; - const context = createContext({}, { statePropsToRemove }); - - const state = { x: 1, configuration: 1 }; - const result = await execute(context, job, state as any); - t.deepEqual(result, state); -}); - -test('no props are removed from state if a falsy value is passed to statePropsToRemove', async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = undefined; - const context = createContext({}, { statePropsToRemove }); - - const state = { x: 1, configuration: 1 }; - const result = await execute(context, job, state as any); - t.deepEqual(result, state); -}); - -test('config is removed from the result', async (t) => { +test.serial( + 'configuration is removed from the result by default', + async (t) => { + const job = [async (s: State) => s]; + const context = createContext(); + + const result = await execute(context, job, { configuration: {} }); + t.deepEqual(result, {}); + } +); + +test.serial( + 'statePropsToRemove removes multiple props from state', + async (t) => { + const job = [async (s: State) => s]; + const statePropsToRemove = ['x', 'y']; + const context = createContext({}, { statePropsToRemove }); + + const result = await execute(context, job, { x: 1, y: 1, z: 1 }); + t.deepEqual(result, { z: 1 }); + } +); + +test.serial( + 'statePropsToRemove logs to debug when a prop is removed', + async (t) => { + const job = [async (s: State) => s]; + const statePropsToRemove = ['x']; + + const context = createContext({}, { statePropsToRemove }); + + const result = await execute(context, job, { x: 1, y: 1, z: 1 }); + t.deepEqual(result, { y: 1, z: 1 }); + + const log = logger._find('debug', /removed x from final state/i); + t.truthy(log); + } +); + +test.serial( + 'no props are removed from state if an empty array is passed to statePropsToRemove', + async (t) => { + const job = [async (s: State) => s]; + const statePropsToRemove = ['x', 'y']; + const context = createContext({}, { statePropsToRemove }); + + const state = { x: 1, configuration: 1 }; + const result = await execute(context, job, state as any); + t.deepEqual(result, state); + } +); + +test.serial( + 'no props are removed from state if a falsy value is passed to statePropsToRemove', + async (t) => { + const job = [async (s: State) => s]; + const statePropsToRemove = undefined; + const context = createContext({}, { statePropsToRemove }); + + const state = { x: 1, configuration: 1 }; + const result = await execute(context, job, state as any); + t.deepEqual(result, state); + } +); + +test.serial('config is removed from the result', async (t) => { const job = [async (s: State) => s]; const context = createContext({ opts: {} }); @@ -159,26 +177,29 @@ test('config is removed from the result', async (t) => { t.deepEqual(result, {}); }); -test('output state is returned verbatim, apart from config', async (t) => { - const state = { - data: {}, - references: [], - configuration: {}, - x: true, - }; - const job = [async () => ({ ...state })]; - - const context = createContext(); - - const result = await execute(context, job, {}); - t.deepEqual(result, { - data: {}, - references: [], - x: true, - }); -}); - -test('operations run in series', async (t) => { +test.serial( + 'output state is returned verbatim, apart from config', + async (t) => { + const state = { + data: {}, + references: [], + configuration: {}, + x: true, + }; + const job = [async () => ({ ...state })]; + + const context = createContext(); + + const result = await execute(context, job, {}); + t.deepEqual(result, { + data: {}, + references: [], + x: true, + }); + } +); + +test.serial('operations run in series', async (t) => { const job = [ (s: TestState) => { s.data.x = 2; @@ -204,7 +225,7 @@ test('operations run in series', async (t) => { t.is(result.data.x, 12); }); -test('async operations run in series', async (t) => { +test.serial('async operations run in series', async (t) => { const job = [ (s: TestState) => { s.data.x = 2; @@ -234,7 +255,7 @@ test('async operations run in series', async (t) => { t.is(result.data.x, 12); }); -test('jobs can return undefined', async (t) => { +test.serial('jobs can return undefined', async (t) => { // @ts-ignore violating the operation contract here const job = [() => undefined] as Operation[]; @@ -246,7 +267,7 @@ test('jobs can return undefined', async (t) => { t.assert(result === undefined); }); -test('jobs can mutate the original state', async (t) => { +test.serial('jobs can mutate the original state', async (t) => { const job = [ (s: TestState) => { s.data.x = 2; @@ -262,7 +283,7 @@ test('jobs can mutate the original state', async (t) => { t.is(result.data.x, 2); }); -test('jobs do not mutate the original state', async (t) => { +test.serial('jobs do not mutate the original state', async (t) => { const job = [ (s: TestState) => { s.data.x = 2; @@ -278,25 +299,28 @@ test('jobs do not mutate the original state', async (t) => { t.is(result.data.x, 2); }); -test('forwards a logger to the console object inside a job', async (t) => { - const logger = createMockLogger(undefined, { level: 'info' }); +test.serial( + 'forwards a logger to the console object inside a job', + async (t) => { + const logger = createMockLogger(undefined, { level: 'info' }); - // We must define this job as a module so that it binds to the sandboxed context - const job = ` + // We must define this job as a module so that it binds to the sandboxed context + const job = ` export default [ (s) => { console.log("x"); return s; } ];`; - const state = createState(); - const context = createContext({ opts: { jobLogger: logger } }); - await execute(context, job, state); + const state = createState(); + const context = createContext({ opts: { jobLogger: logger } }); + await execute(context, job, state); - const output = logger._parse(logger._last); - t.is(output.level, 'info'); - t.is(output.message, 'x'); -}); + const output = logger._parse(logger._last); + t.is(output.level, 'info'); + t.is(output.message, 'x'); + } +); -test('calls execute if exported from a job', async (t) => { +test.serial('calls execute if exported from a job', async (t) => { const logger = createMockLogger(undefined, { level: 'info' }); // The execute function, if called by the runtime, will send a specific @@ -324,7 +348,7 @@ test.skip('Throws after default timeout', async (t) => { }); }); -test('Throws after custom timeout', async (t) => { +test.serial('Throws after custom timeout', async (t) => { const logger = createMockLogger(undefined, { level: 'info' }); const job = `export default [() => new Promise((resolve) => setTimeout(resolve, 100))];`; @@ -340,15 +364,115 @@ test('Throws after custom timeout', async (t) => { }); }); -test('Operations log on start and end', async (t) => { +test.serial('Operations log on start and end', async (t) => { const job = [(s: State) => s]; const state = createState(); const context = createContext(); await execute(context, job, state); - const start = logger._find('debug', /starting operation /i); t.truthy(start); const end = logger._find('debug', /operation 1 complete in \dms/i); t.truthy(end); }); + +test.serial('mergeLinkerOptions: use linker options only', (t) => { + const map = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const result = mergeLinkerOptions(map); + t.deepEqual(result, map); +}); + +test.serial('mergeLinkerOptions: use override options only', (t) => { + const map = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const result = mergeLinkerOptions(undefined, map); + t.deepEqual(result, map); +}); + +test.serial('mergeLinkerOptions: override path and value', (t) => { + const options = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const override = { + x: { + path: 'x/y/z', + version: '2.0.0', + }, + }; + const result = mergeLinkerOptions(options, override); + t.deepEqual(result, override); +}); + +test.serial('mergeLinkerOptions: override path only', (t) => { + const options = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const override = { + x: { + path: 'x/y/z', + }, + }; + const result = mergeLinkerOptions(options, override); + t.deepEqual(result, { + x: { + path: 'x/y/z', + version: '1.0.0', + }, + }); +}); + +test.serial('mergeLinkerOptions: override version only', (t) => { + const options = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const override = { + x: { + version: '2.0.0', + }, + }; + const result = mergeLinkerOptions(options, override); + t.deepEqual(result, { + x: { + path: 'a/b/c', + version: '2.0.0', + }, + }); +}); + +test.serial('mergeLinkerOptions: merge multiple adaptors', (t) => { + const options = { + x: { + path: 'a/b/c', + version: '1.0.0', + }, + }; + const override = { + y: { + path: 'x/y/z', + version: '2.0.0', + }, + }; + const result = mergeLinkerOptions(options, override); + t.deepEqual(result, { + ...options, + ...override, + }); +}); diff --git a/packages/runtime/test/modules/module-loader.test.ts b/packages/runtime/test/modules/module-loader.test.ts index dbbdcfb52..1ed7306ec 100644 --- a/packages/runtime/test/modules/module-loader.test.ts +++ b/packages/runtime/test/modules/module-loader.test.ts @@ -45,7 +45,7 @@ test('load a module with an import', async (t) => { t.assert(m.default === 20); }); -test('load a module with aribtrary exports', async (t) => { +test('load a module with aribitrary exports', async (t) => { const src = 'export const x = 10; export const y = 20;'; const m = await loadModule(src); diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index e7f8af39e..bfafc694a 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -598,6 +598,106 @@ test('run from an adaptor', async (t) => { t.deepEqual(result, { data: 22 }); }); +test('run a workflow using the repo and load the default version', async (t) => { + const expression = ` + import result from 'ultimate-answer'; + export default [() => result]; + `; + const plan = { + workflow: { + steps: [ + { + id: 'a', + expression, + }, + ], + }, + }; + + const result: any = await run( + plan, + {}, + { + linker: { + repo: path.resolve('test/__repo__'), + }, + } + ); + + t.deepEqual(result, 43); +}); + +test('run a workflow using the repo using a specific version', async (t) => { + const expression = ` + import result from 'ultimate-answer'; + export default [() => result]; + `; + const plan = { + workflow: { + steps: [ + { + id: 'a', + expression, + }, + ], + }, + }; + + const result: any = await run( + plan, + {}, + { + linker: { + repo: path.resolve('test/__repo__'), + modules: { + 'ultimate-answer': { version: '1.0.0' }, + }, + }, + } + ); + + t.deepEqual(result, 42); +}); + +test('run a workflow using the repo with multiple versions of the same adaptor', async (t) => { + const plan = { + workflow: { + steps: [ + { + id: 'a', + expression: `import result from 'ultimate-answer'; + export default [(s) => { s.data.a = result; return s;}];`, + adaptor: 'ultimate-answer@1.0.0', + next: { b: true }, + }, + { + id: 'b', + expression: `import result from 'ultimate-answer'; + export default [(s) => { s.data.b = result; return s;}];`, + adaptor: 'ultimate-answer@2.0.0', + }, + ], + }, + }; + + const result: any = await run( + plan, + {}, + { + linker: { + repo: path.resolve('test/__repo__'), + }, + } + ); + + t.deepEqual(result, { + data: { + a: 42, + b: 43, + }, + }); +}); + // https://github.com/OpenFn/kit/issues/520 test('run from an adaptor with error', async (t) => { const expression = ` From 58e0d11489cc831963422a0fb188895852d1fa9b Mon Sep 17 00:00:00 2001 From: josephjclark Date: Fri, 23 Feb 2024 17:56:37 +0000 Subject: [PATCH 2/3] Worker: Update version listings (#611) * engine: publish versions with workflow-start and support multiple adaptors * worker: update version handling We only include versions on workflow start and allow multiple adaptor versions to be printed --- .changeset/fair-bobcats-applaud.md | 5 + .changeset/yellow-clouds-thank.md | 5 + packages/engine-multi/src/api/autoinstall.ts | 12 ++- packages/engine-multi/src/api/lifecycle.ts | 2 +- .../src/classes/ExecutionContext.ts | 2 +- packages/engine-multi/src/events.ts | 5 +- packages/engine-multi/src/types.ts | 3 +- .../engine-multi/test/api/autoinstall.test.ts | 4 +- .../engine-multi/test/api/execute.test.ts | 7 +- .../engine-multi/test/api/lifecycle.test.ts | 7 +- .../engine-multi/test/integration.test.ts | 2 +- packages/ws-worker/src/api/execute.ts | 18 +--- packages/ws-worker/src/events/run-start.ts | 47 ++++++++ packages/ws-worker/src/events/step-start.ts | 47 +------- packages/ws-worker/src/mock/runtime-engine.ts | 1 + packages/ws-worker/src/util/versions.ts | 18 ++-- packages/ws-worker/test/api/execute.test.ts | 12 --- .../ws-worker/test/events/run-start.test.ts | 101 ++++++++++++++++++ .../ws-worker/test/events/step-start.test.ts | 98 ----------------- packages/ws-worker/test/lightning.test.ts | 55 ++++++---- packages/ws-worker/test/util/versions.test.ts | 30 ++++-- 21 files changed, 254 insertions(+), 227 deletions(-) create mode 100644 .changeset/fair-bobcats-applaud.md create mode 100644 .changeset/yellow-clouds-thank.md create mode 100644 packages/ws-worker/src/events/run-start.ts create mode 100644 packages/ws-worker/test/events/run-start.test.ts diff --git a/.changeset/fair-bobcats-applaud.md b/.changeset/fair-bobcats-applaud.md new file mode 100644 index 000000000..bc5da4f74 --- /dev/null +++ b/.changeset/fair-bobcats-applaud.md @@ -0,0 +1,5 @@ +--- +'@openfn/engine-multi': patch +--- + +Record adaptor versions as an array diff --git a/.changeset/yellow-clouds-thank.md b/.changeset/yellow-clouds-thank.md new file mode 100644 index 000000000..fabbc3d3b --- /dev/null +++ b/.changeset/yellow-clouds-thank.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Move version log to workflow start diff --git a/packages/engine-multi/src/api/autoinstall.ts b/packages/engine-multi/src/api/autoinstall.ts index 1eb87b961..1264d8870 100644 --- a/packages/engine-multi/src/api/autoinstall.ts +++ b/packages/engine-multi/src/api/autoinstall.ts @@ -135,11 +135,13 @@ const autoinstall = async (context: ExecutionContext): Promise => { const v = version || 'unknown'; - // Write the adaptor version to the context - // This is a reasonably accurate, but not totally bulletproof, report - // @ts-ignore - // TODO need to remove this soon as it's basically lying - context.versions[name] = v; + // Write the adaptor version to the context for reporting later + if (!context.versions[name]) { + context.versions[name] = []; + } + if (!context.versions[name].includes(v)) { + (context.versions[name] as string[]).push(v); + } paths[a] = { path: `${repoDir}/node_modules/${alias}`, diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index f7c71101b..7b66ce6ae 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -38,6 +38,7 @@ export const workflowStart = ( // forward the event on to any external listeners context.emit(externalEvents.WORKFLOW_START, { threadId, + versions: context.versions, }); }; @@ -81,7 +82,6 @@ export const jobStart = ( context.emit(externalEvents.JOB_START, { jobId, threadId, - versions: context.versions, }); }; diff --git a/packages/engine-multi/src/classes/ExecutionContext.ts b/packages/engine-multi/src/classes/ExecutionContext.ts index 0e7c70480..db6c90c49 100644 --- a/packages/engine-multi/src/classes/ExecutionContext.ts +++ b/packages/engine-multi/src/classes/ExecutionContext.ts @@ -12,7 +12,7 @@ import type { import type { ExternalEvents, EventMap } from '../events'; /** - * The ExeuctionContext class wraps an event emitter with some useful context + * The ExecutionContext class wraps an event emitter with some useful context * and automatically appends the workflow id to each emitted events * * Each running workflow has its own context object diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index 4dc1d63e6..f32c5eaca 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -49,7 +49,9 @@ interface ExternalEvent { workflowId: string; } -export interface WorkflowStartPayload extends ExternalEvent {} +export interface WorkflowStartPayload extends ExternalEvent { + versions: Versions; +} export interface WorkflowCompletePayload extends ExternalEvent { state: any; @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent { export interface JobStartPayload extends ExternalEvent { jobId: string; - versions: Versions; } export interface JobCompletePayload extends ExternalEvent { diff --git a/packages/engine-multi/src/types.ts b/packages/engine-multi/src/types.ts index bc69b7445..76bead978 100644 --- a/packages/engine-multi/src/types.ts +++ b/packages/engine-multi/src/types.ts @@ -83,5 +83,6 @@ export type Versions = { node: string; engine: string; compiler: string; - [adaptor: string]: string; + + [adaptor: string]: string | string[]; }; diff --git a/packages/engine-multi/test/api/autoinstall.test.ts b/packages/engine-multi/test/api/autoinstall.test.ts index d1ee4b235..e63c306d3 100644 --- a/packages/engine-multi/test/api/autoinstall.test.ts +++ b/packages/engine-multi/test/api/autoinstall.test.ts @@ -565,7 +565,7 @@ test('write versions to context', async (t) => { await autoinstall(context); // @ts-ignore - t.is(context.versions['@openfn/language-common'], '1.0.0'); + t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); }); test("write versions to context even if we don't install", async (t) => { @@ -578,5 +578,5 @@ test("write versions to context even if we don't install", async (t) => { await autoinstall(context); // @ts-ignore - t.is(context.versions['@openfn/language-common'], '1.0.0'); + t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']); }); diff --git a/packages/engine-multi/test/api/execute.test.ts b/packages/engine-multi/test/api/execute.test.ts index 9b46e2a74..e20727c1e 100644 --- a/packages/engine-multi/test/api/execute.test.ts +++ b/packages/engine-multi/test/api/execute.test.ts @@ -86,6 +86,7 @@ test.serial('should emit a workflow-start event', async (t) => { id: 'x', plan, } as WorkflowState; + let workflowStart; const context = createContext({ state, options }); @@ -96,6 +97,9 @@ test.serial('should emit a workflow-start event', async (t) => { // No need to do a deep test of the event payload here t.is(workflowStart!.workflowId!, 'x'); + // Just a shallow test on the actual version object to verify that it's been attached + t.truthy(workflowStart!.versions); + t.regex(workflowStart!.versions.node, new RegExp(/(\d+).(\d+).\d+/)); }); test.serial('should emit a log event with the memory limit', async (t) => { @@ -156,9 +160,6 @@ test.serial('should emit a job-start event', async (t) => { await execute(context); t.is(event.jobId, 'j'); - t.truthy(event.versions); - // Just a shallow test on the actual version object to verify that it's been attached - t.regex(event.versions.node, new RegExp(/(\d+).(\d+).\d+/)); }); test.serial('should emit a job-complete event', async (t) => { diff --git a/packages/engine-multi/test/api/lifecycle.test.ts b/packages/engine-multi/test/api/lifecycle.test.ts index 329128e5d..05896c20a 100644 --- a/packages/engine-multi/test/api/lifecycle.test.ts +++ b/packages/engine-multi/test/api/lifecycle.test.ts @@ -36,10 +36,9 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => { }; context.on(e.WORKFLOW_START, (evt) => { - t.deepEqual(evt, { - workflowId, - threadId: '123', - }); + t.truthy(evt.versions); + t.is(evt.workflowId, workflowId); + t.is(evt.threadId, '123'); done(); }); diff --git a/packages/engine-multi/test/integration.test.ts b/packages/engine-multi/test/integration.test.ts index 21c34e2cd..93a489850 100644 --- a/packages/engine-multi/test/integration.test.ts +++ b/packages/engine-multi/test/integration.test.ts @@ -56,6 +56,7 @@ test.serial('trigger workflow-start', (t) => { api.execute(plan, emptyState).on('workflow-start', (evt) => { t.is(evt.workflowId, plan.id); t.truthy(evt.threadId); + t.truthy(evt.versions); t.pass('workflow started'); done(); }); @@ -77,7 +78,6 @@ test.serial('trigger job-start', (t) => { t.is(e.workflowId, '2'); t.is(e.jobId, 'j1'); t.truthy(e.threadId); - t.truthy(e.versions); t.pass('job started'); done(); }); diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index c35ea2ca9..6cac55c12 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -1,11 +1,7 @@ import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon'; -import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning'; +import type { RunLogPayload } from '@openfn/lexicon/lightning'; import type { Logger } from '@openfn/logger'; -import type { - RuntimeEngine, - Resolvers, - WorkflowStartPayload, -} from '@openfn/engine-multi'; +import type { RuntimeEngine, Resolvers } from '@openfn/engine-multi'; import { getWithReply, @@ -21,6 +17,7 @@ import { STEP_START, GET_CREDENTIAL, } from '../events'; +import handleRunStart from '../events/run-start'; import handleStepComplete from '../events/step-complete'; import handleStepStart from '../events/step-start'; import handleRunComplete from '../events/run-complete'; @@ -114,7 +111,7 @@ export function execute( // so that they send in order const listeners = Object.assign( {}, - addEvent('workflow-start', throttle(onWorkflowStart)), + addEvent('workflow-start', throttle(handleRunStart)), addEvent('job-start', throttle(handleStepStart)), addEvent('job-complete', throttle(handleStepComplete)), addEvent('job-error', throttle(onJobError)), @@ -213,13 +210,6 @@ export function onJobError(context: Context, event: any) { } } -export function onWorkflowStart( - { channel }: Context, - _event: WorkflowStartPayload -) { - return sendEvent(channel, RUN_START); -} - export function onJobLog({ channel, state }: Context, event: JSONLog) { const timeInMicroseconds = BigInt(event.time) / BigInt(1e3); diff --git a/packages/ws-worker/src/events/run-start.ts b/packages/ws-worker/src/events/run-start.ts new file mode 100644 index 000000000..8845f5446 --- /dev/null +++ b/packages/ws-worker/src/events/run-start.ts @@ -0,0 +1,47 @@ +import type { RunStartPayload } from '@openfn/lexicon/lightning'; +import { timestamp } from '@openfn/logger'; +import type { WorkflowStartPayload } from '@openfn/engine-multi'; + +import { RUN_START } from '../events'; +import { sendEvent, Context, onJobLog } from '../api/execute'; +import calculateVersionString from '../util/versions'; + +import pkg from '../../package.json' assert { type: 'json' }; + +export default async function onRunStart( + context: Context, + event: WorkflowStartPayload +) { + const { channel, state } = context; + // Cheat on the timestamp time to make sure this is the first thing in the log + const time = (timestamp() - BigInt(10e6)).toString(); + + // Send the log with its own little state object + // to preserve the run id + // Otherwise, by the time the log sends, + // the active step could have changed + // TODO if I fix ordering I think I can kill this + const versionLogContext = { + ...context, + state: { + ...state, + activeStep: state.activeStep, + }, + }; + + const versions = { + worker: pkg.version, + ...event.versions, + }; + + await sendEvent(channel, RUN_START, { versions }); + + const versionMessage = calculateVersionString(versions); + + await onJobLog(versionLogContext, { + time, + message: [versionMessage], + level: 'info', + name: 'VER', + }); +} diff --git a/packages/ws-worker/src/events/step-start.ts b/packages/ws-worker/src/events/step-start.ts index 561652431..3eea012e1 100644 --- a/packages/ws-worker/src/events/step-start.ts +++ b/packages/ws-worker/src/events/step-start.ts @@ -1,70 +1,25 @@ import crypto from 'node:crypto'; -import { timestamp } from '@openfn/logger'; import { JobStartPayload } from '@openfn/engine-multi'; -import type { Job } from '@openfn/lexicon'; import type { StepStartPayload } from '@openfn/lexicon/lightning'; -import pkg from '../../package.json' assert { type: 'json' }; import { STEP_START } from '../events'; -import { sendEvent, Context, onJobLog } from '../api/execute'; -import calculateVersionString from '../util/versions'; +import { sendEvent, Context } from '../api/execute'; export default async function onStepStart( context: Context, event: JobStartPayload ) { - // Cheat on the timestamp time to make sure this is the first thing in the log - const time = (timestamp() - BigInt(10e6)).toString(); - const { channel, state } = context; // generate a run id and write it to state state.activeStep = crypto.randomUUID(); state.activeJob = event.jobId; - const job = state.plan.workflow.steps.find( - ({ id }) => id === event.jobId - ) as Job; - const input_dataclip_id = state.inputDataclips[event.jobId]; - const versions = { - worker: pkg.version, - ...event.versions, - }; - - // Send the log with its own little state object - // to preserve the run id - // Otherwise, by the time the log sends, - // the active step could have changed - // TODO if I fix ordering I think I can kill this - const versionLogContext = { - ...context, - state: { - ...state, - activeStep: state.activeStep, - }, - }; - await sendEvent(channel, STEP_START, { step_id: state.activeStep!, job_id: state.activeJob!, input_dataclip_id, - - versions, - }); - - const versionMessage = calculateVersionString( - versionLogContext.state.activeStep, - versions, - job?.adaptor - ); - - await onJobLog(versionLogContext, { - time, - message: [versionMessage], - level: 'info', - name: 'VER', }); - return; } diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index f96541056..6e0c9ef46 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -98,6 +98,7 @@ async function createMock() { level: 'info', json: true, message: JSON.stringify(args), + name: 'JOB', time: Date.now(), }); }, diff --git a/packages/ws-worker/src/util/versions.ts b/packages/ws-worker/src/util/versions.ts index 8c3ba4383..4c955b72e 100644 --- a/packages/ws-worker/src/util/versions.ts +++ b/packages/ws-worker/src/util/versions.ts @@ -6,33 +6,35 @@ export type Versions = { node: string; worker: string; - [adaptor: string]: string; + [adaptor: string]: string | string[]; }; -export default (stepId: string, versions: Versions, adaptor?: string) => { +export default (versions: Versions) => { let longest = 'worker'.length; // Bit wierdly defensive but ensure padding is reasonable even if version has no props for (const v in versions) { longest = Math.max(v.length, longest); } - const { node, worker, ...adaptors } = versions; + const { node, worker, compiler, runtime, engine, ...adaptors } = versions; // Prefix and pad version numbers const prefix = (str: string) => ` ${t} ${str.padEnd(longest + 4, ' ')}`; - let str = `Versions for step ${stepId}: + let str = `Versions: ${prefix('node.js')}${versions.node || 'unknown'} ${prefix('worker')}${versions.worker || 'unknown'}`; if (Object.keys(adaptors).length) { let allAdaptors = Object.keys(adaptors); - if (adaptor) { - allAdaptors = allAdaptors.filter((name) => adaptor.startsWith(name)); - } str += '\n' + allAdaptors .sort() - .map((adaptorName) => `${prefix(adaptorName)}${adaptors[adaptorName]}`) + .map( + (adaptorName) => + `${prefix(adaptorName)}${(adaptors[adaptorName] as string[]) + .sort() + .join(', ')}` + ) .join('\n'); } diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 4c74bfb65..818b2c08f 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -14,7 +14,6 @@ import { import { onJobLog, execute, - onWorkflowStart, loadDataclip, loadCredential, sendEvent, @@ -181,17 +180,6 @@ test('jobError should trigger step:complete with a reason and default state', as t.deepEqual(stepCompleteEvent.output_dataclip, '{}'); }); -test('workflowStart should send an empty run:start event', async (t) => { - const channel = mockChannel({ - [RUN_START]: () => { - t.pass(); - }, - }); - - // @ts-ignore - await onWorkflowStart({ channel }); -}); - // test('workflowComplete should send an run:complete event', async (t) => { // const result = { answer: 42 }; diff --git a/packages/ws-worker/test/events/run-start.test.ts b/packages/ws-worker/test/events/run-start.test.ts new file mode 100644 index 000000000..4eab3a903 --- /dev/null +++ b/packages/ws-worker/test/events/run-start.test.ts @@ -0,0 +1,101 @@ +import test from 'ava'; +import type { WorkflowStartPayload } from '@openfn/engine-multi'; + +import handleRunStart from '../../src/events/run-start'; +import { mockChannel } from '../../src/mock/sockets'; +import { createRunState } from '../../src/util'; +import { RUN_LOG, RUN_START } from '../../src/events'; + +import pkg from '../../package.json' assert { type: 'json' }; + +test('run:start event should include versions', async (t) => { + const plan = { + id: 'run-1', + workflow: { + steps: [{ id: 'job-1', expression: '.' }], + }, + options: {}, + }; + const input = 'abc'; + const jobId = 'job-1'; + + const versions = { + node: process.version.substring(1), + '@openfn/language-common': ['1.0.0'], + }; + + // Simulate an event that would be generated by the worker + const event: WorkflowStartPayload = { + workflowId: plan.id, + // @ts-ignore + versions, + }; + + const state = createRunState(plan, input); + state.activeJob = jobId; + state.activeStep = 'b'; + + const channel = mockChannel({ + [RUN_START]: (evt) => { + t.deepEqual(evt.versions, { + ...versions, + worker: pkg.version, + }); + return true; + }, + [RUN_LOG]: () => true, + }); + + await handleRunStart({ channel, state } as any, event); +}); + +test('run:start should log the version number', async (t) => { + let logEvent: any; + const plan = { + id: 'run-1', + workflow: { + steps: [{ id: 'job-1', expression: '.' }], + }, + options: {}, + }; + const input = 'abc'; + const jobId = 'job-1'; + + const versions = { + node: process.version.substring(1), + engine: '1.0.0', + compiler: '1.0.0', + worker: pkg.version, + '@openfn/language-common': ['1.0.0'], + }; + + // Simulate an event that would be generated by the worker + const event: WorkflowStartPayload = { + workflowId: plan.id, + versions, + }; + + const state = createRunState(plan, input); + state.activeJob = jobId; + state.activeStep = 'b'; + + const channel = mockChannel({ + [RUN_START]: () => true, + [RUN_LOG]: (evt) => { + if (evt.source === 'VER') { + logEvent = evt; + } + return true; + }, + }); + + await handleRunStart({ channel, state } as any, event); + + t.truthy(logEvent); + t.is(logEvent.level, 'info'); + const [message] = logEvent.message; + t.log(message); + // This just a light test of the string to make sure it's here + // It uses src/util/versions, which is tested elsewhere + t.regex(message, /(node\.js).+(worker).+(@openfn\/language-common)/is); +}); diff --git a/packages/ws-worker/test/events/step-start.test.ts b/packages/ws-worker/test/events/step-start.test.ts index e97b69a61..343ef5bd0 100644 --- a/packages/ws-worker/test/events/step-start.test.ts +++ b/packages/ws-worker/test/events/step-start.test.ts @@ -1,14 +1,10 @@ import test from 'ava'; import handleStepStart from '../../src/events/step-start'; -import { JobStartPayload } from '@openfn/engine-multi'; - import { mockChannel } from '../../src/mock/sockets'; import { createRunState } from '../../src/util'; import { RUN_LOG, STEP_START } from '../../src/events'; -import pkg from '../../package.json' assert { type: 'json' }; - test('set a step id and active job on state', async (t) => { const plan = { id: 'run-1', @@ -60,97 +56,3 @@ test('send a step:start event', async (t) => { await handleStepStart({ channel, state } as any, { jobId } as any); }); - -test('step:start event should include versions', async (t) => { - const plan = { - id: 'run-1', - workflow: { - steps: [{ id: 'job-1', expression: '.' }], - }, - options: {}, - }; - const input = 'abc'; - const jobId = 'job-1'; - - const versions = { - node: process.version.substring(1), - engine: '1.0.0', - compiler: '1.0.0', - worker: pkg.version, - }; - - // Simulate an event that would be generated by the worker - const event: JobStartPayload = { - jobId, - workflowId: plan.id, - versions, - }; - - const state = createRunState(plan, input); - state.activeJob = jobId; - state.activeStep = 'b'; - - const channel = mockChannel({ - [STEP_START]: (evt) => { - t.deepEqual(evt.versions, { - ...versions, - worker: pkg.version, - }); - return true; - }, - [RUN_LOG]: () => true, - }); - - await handleStepStart({ channel, state } as any, event); -}); - -test('also logs the version number', async (t) => { - let logEvent: any; - const plan = { - id: 'run-1', - workflow: { - steps: [{ id: 'job-1', expression: '.' }], - }, - options: {}, - }; - const input = 'abc'; - const jobId = 'job-1'; - - const versions = { - node: process.version.substring(1), - engine: '1.0.0', - compiler: '1.0.0', - worker: pkg.version, - }; - - // Simulate an event that would be generated by the worker - const event: JobStartPayload = { - jobId, - workflowId: plan.id, - versions, - }; - - const state = createRunState(plan, input); - state.activeJob = jobId; - state.activeStep = 'b'; - - const channel = mockChannel({ - [STEP_START]: () => true, - [RUN_LOG]: (evt) => { - if (evt.source === 'VER') { - logEvent = evt; - } - return true; - }, - }); - - await handleStepStart({ channel, state } as any, event); - - t.truthy(logEvent); - t.is(logEvent.level, 'info'); - const [message] = logEvent.message; - t.log(message); - // This just a light test of the string to make sure it's here - // It uses src/util/versions, which is tested elsewhere - t.regex(message, /(node\.js).+(worker).+(engine)/is); -}); diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 1fe55c90a..bc40d2133 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -402,15 +402,22 @@ test.serial(`events: lightning should receive a ${e.RUN_LOG} event`, (t) => { ], }; - lng.onSocketEvent(e.RUN_LOG, run.id, ({ payload }: any) => { - const log = payload; - - t.is(log.level, 'info'); - t.truthy(log.run_id); - t.truthy(log.step_id); - t.truthy(log.message); - t.deepEqual(log.message, ['x']); - }); + lng.onSocketEvent( + e.RUN_LOG, + run.id, + ({ payload }: any) => { + if (payload.source === 'JOB') { + const log = payload; + + t.is(log.level, 'info'); + t.truthy(log.run_id); + t.truthy(log.step_id); + t.truthy(log.message); + t.deepEqual(log.message, ['x']); + } + }, + false + ); lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { done(); @@ -712,11 +719,16 @@ test.serial(`worker should send a success reason in the logs`, (t) => { ], }; - lng.onSocketEvent(e.RUN_LOG, run.id, ({ payload }: any) => { - if (payload.message[0].match(/Run complete with status: success/)) { - log = payload.message[0]; - } - }); + lng.onSocketEvent( + e.RUN_LOG, + run.id, + ({ payload }: any) => { + if (payload.message[0].match(/Run complete with status: success/)) { + log = payload.message[0]; + } + }, + false + ); lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { t.truthy(log); @@ -740,11 +752,16 @@ test.serial(`worker should send a fail reason in the logs`, (t) => { ], }; - lng.onSocketEvent(e.RUN_LOG, run.id, ({ payload }: any) => { - if (payload.message[0].match(/Run complete with status: fail/)) { - log = payload.message[0]; - } - }); + lng.onSocketEvent( + e.RUN_LOG, + run.id, + ({ payload }: any) => { + if (payload.message[0].match(/Run complete with status: fail/)) { + log = payload.message[0]; + } + }, + false + ); lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { t.truthy(log); diff --git a/packages/ws-worker/test/util/versions.test.ts b/packages/ws-worker/test/util/versions.test.ts index 6aadf00a3..1a0911bdf 100644 --- a/packages/ws-worker/test/util/versions.test.ts +++ b/packages/ws-worker/test/util/versions.test.ts @@ -25,18 +25,18 @@ const parse = (str: string) => { }; test('calculate version string', (t) => { - const str = calculateVersionString('step-1', versions); + const str = calculateVersionString(versions); // Formatting is super fussy in this test but it's sort of OK t.is( str, - `Versions for step step-1: + `Versions: ▸ node.js 1 ▸ worker 2` ); }); test('helper should parse a version string and return the correct order', (t) => { - const str = calculateVersionString('step-1', versions); + const str = calculateVersionString(versions); const parsed = parse(str); t.deepEqual(parsed, [ @@ -47,7 +47,7 @@ test('helper should parse a version string and return the correct order', (t) => test("show unknown if a version isn't passed", (t) => { // @ts-ignore - const str = calculateVersionString('step-1', {}); + const str = calculateVersionString({}); const parsed = parse(str); t.deepEqual(parsed, [ @@ -57,8 +57,8 @@ test("show unknown if a version isn't passed", (t) => { }); test('show adaptors last', (t) => { - const str = calculateVersionString('step-1', { - '@openfn/language-common': '1.0.0', + const str = calculateVersionString({ + '@openfn/language-common': ['1.0.0'], ...versions, }); const parsed = parse(str); @@ -67,10 +67,10 @@ test('show adaptors last', (t) => { }); test('sort and list multiple adaptors', (t) => { - const str = calculateVersionString('step-1', { - j: '2', - z: '3', - a: '1', + const str = calculateVersionString({ + j: ['2'], + z: ['3'], + a: ['1'], ...versions, }); @@ -84,3 +84,13 @@ test('sort and list multiple adaptors', (t) => { t.deepEqual(j, ['j', '2']); t.deepEqual(z, ['z', '3']); }); + +test('show multiple adaptor versions', (t) => { + const str = calculateVersionString({ + '@openfn/language-common': ['1.0.0', '2.0.0'], + ...versions, + }); + const parsed = parse(str); + const common = parsed[2]; + t.deepEqual(common, ['@openfn/language-common', '1.0.0,', '2.0.0']); +}); From 8c94a9d41989951bdab0041b0852362d8c1d1b64 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 23 Feb 2024 18:00:41 +0000 Subject: [PATCH 3/3] versions: worker@1.1.0, cli@1.1.0 --- .changeset/calm-books-care.md | 6 ------ .changeset/fair-bobcats-applaud.md | 5 ----- .changeset/yellow-clouds-thank.md | 5 ----- integration-tests/worker/CHANGELOG.md | 11 +++++++++++ integration-tests/worker/package.json | 2 +- packages/cli/CHANGELOG.md | 9 +++++++++ packages/cli/package.json | 2 +- packages/engine-multi/CHANGELOG.md | 12 ++++++++++++ packages/engine-multi/package.json | 2 +- packages/lightning-mock/CHANGELOG.md | 9 +++++++++ packages/lightning-mock/package.json | 2 +- packages/runtime/CHANGELOG.md | 6 ++++++ packages/runtime/package.json | 2 +- packages/ws-worker/CHANGELOG.md | 12 ++++++++++++ packages/ws-worker/package.json | 2 +- 15 files changed, 65 insertions(+), 22 deletions(-) delete mode 100644 .changeset/calm-books-care.md delete mode 100644 .changeset/fair-bobcats-applaud.md delete mode 100644 .changeset/yellow-clouds-thank.md diff --git a/.changeset/calm-books-care.md b/.changeset/calm-books-care.md deleted file mode 100644 index dafa1bd09..000000000 --- a/.changeset/calm-books-care.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -'@openfn/engine-multi': minor -'@openfn/runtime': minor ---- - -Support workflows with different versions of the same adaptor diff --git a/.changeset/fair-bobcats-applaud.md b/.changeset/fair-bobcats-applaud.md deleted file mode 100644 index bc5da4f74..000000000 --- a/.changeset/fair-bobcats-applaud.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/engine-multi': patch ---- - -Record adaptor versions as an array diff --git a/.changeset/yellow-clouds-thank.md b/.changeset/yellow-clouds-thank.md deleted file mode 100644 index fabbc3d3b..000000000 --- a/.changeset/yellow-clouds-thank.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/ws-worker': patch ---- - -Move version log to workflow start diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 57ec04d55..7f6aece75 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,16 @@ # @openfn/integration-tests-worker +## 1.0.36 + +### Patch Changes + +- Updated dependencies [4f5f1dd] +- Updated dependencies [58e0d11] +- Updated dependencies [58e0d11] + - @openfn/engine-multi@1.1.0 + - @openfn/ws-worker@1.0.1 + - @openfn/lightning-mock@2.0.1 + ## 1.0.35 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index b9be215c8..a56a8b1ad 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.35", + "version": "1.0.36", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index d63fb5564..922085f3d 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/cli +## 1.1.0 + +### Patch Changes + +Allow multiple version of the same adaptor to run in the same workflow + +- Updated dependencies [4f5f1dd] + - @openfn/runtime@1.1.0 + ## 1.0.0 ### Major Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 1b1f72ad0..b7de5b514 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.0.0", + "version": "1.1.0", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index ef3d59bb6..c1ba43d97 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,17 @@ # engine-multi +## 1.1.0 + +### Minor Changes + +- 4f5f1dd: Support workflows with different versions of the same adaptor + +### Patch Changes + +- 58e0d11: Record adaptor versions as an array +- Updated dependencies [4f5f1dd] + - @openfn/runtime@1.1.0 + ## 1.0.0 ### Major Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 0b4c520cc..251523fbb 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.0.0", + "version": "1.1.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index edff55188..1ccb71880 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/lightning-mock +## 2.0.1 + +### Patch Changes + +- Updated dependencies [4f5f1dd] +- Updated dependencies [58e0d11] + - @openfn/engine-multi@1.1.0 + - @openfn/runtime@1.1.0 + ## 2.0.0 ### Major Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 6d44a6698..d19b4ca49 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.0", + "version": "2.0.1", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index a3f02e071..756d2abc3 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.1.0 + +### Minor Changes + +- 4f5f1dd: Support workflows with different versions of the same adaptor + ## 1.0.0 ### Major Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 8c00db466..79aed3a63 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.0.0", + "version": "1.1.0", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index d717a45cd..e959ac5d9 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,17 @@ # ws-worker +## 1.1.0 + +Allow runs to use multiple versions of the same adaptor + +### Patch Changes + +- 58e0d11: Move version log to workflow start +- Updated dependencies [4f5f1dd] +- Updated dependencies [58e0d11] + - @openfn/engine-multi@1.1.0 + - @openfn/runtime@1.1.0 + ## 1.0.0 ### Major Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 82eb2be59..af5a2f972 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.0.0", + "version": "1.1.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",