diff --git a/integration-tests/execute/CHANGELOG.md b/integration-tests/execute/CHANGELOG.md index 4227ce99a..68e4e31cf 100644 --- a/integration-tests/execute/CHANGELOG.md +++ b/integration-tests/execute/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/integration-tests-execute +## 1.0.6 + +### Patch Changes + +- Updated dependencies [3463ff9] +- Updated dependencies [7a85894] +- Updated dependencies [b6de2c4] + - @openfn/runtime@1.5.0 + - @openfn/compiler@0.4.0 + ## 1.0.5 ### Patch Changes diff --git a/integration-tests/execute/package.json b/integration-tests/execute/package.json index ad218cfcb..85e594a98 100644 --- a/integration-tests/execute/package.json +++ b/integration-tests/execute/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-execute", "private": true, - "version": "1.0.5", + "version": "1.0.6", "description": "Job execution tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/execute/src/execute.ts b/integration-tests/execute/src/execute.ts index 49aba31d7..b4fd40f12 100644 --- a/integration-tests/execute/src/execute.ts +++ b/integration-tests/execute/src/execute.ts @@ -6,10 +6,12 @@ const execute = async (job: string, state: any, adaptor = 'common') => { // compile with common and dumb imports const options = { 'add-imports': { - adaptor: { - name: `@openfn/language-${adaptor}`, - exportAll: true, - }, + adaptors: [ + { + name: `@openfn/language-${adaptor}`, + exportAll: true, + }, + ], }, }; const compiled = compiler(job, options); diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 67f321c2c..df7816a03 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,18 @@ # @openfn/integration-tests-worker +## 1.0.63 + +### Patch Changes + +- Updated dependencies [fd0e499] +- Updated dependencies [1c79dc1] +- Updated dependencies [7245bf7] +- Updated dependencies [b15f151] +- Updated dependencies [bcd82e9] + - @openfn/ws-worker@1.8.0 + - @openfn/engine-multi@1.4.0 + - @openfn/lightning-mock@2.0.21 + ## 1.0.62 ### Patch Changes diff --git a/integration-tests/worker/monorepo/packages/common/index.js b/integration-tests/worker/monorepo/packages/common/index.js new file mode 100644 index 000000000..0406060fa --- /dev/null +++ b/integration-tests/worker/monorepo/packages/common/index.js @@ -0,0 +1,8 @@ +function fortyTwo() { + return (state) => { + state.data = 42; + return state; + }; +} + +module.exports = { fortyTwo }; diff --git a/integration-tests/worker/monorepo/packages/common/package.json b/integration-tests/worker/monorepo/packages/common/package.json new file mode 100644 index 000000000..e4b68cfec --- /dev/null +++ b/integration-tests/worker/monorepo/packages/common/package.json @@ -0,0 +1,7 @@ +{ + "name": "@openfn/language-common", + "private": true, + "version": "1.0.0", + "dependencies": {}, + "devDependencies": {} +} diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 089ee40ae..8bb566c17 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.62", + "version": "1.0.63", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/src/init.ts b/integration-tests/worker/src/init.ts index 9e1c768a3..358cc2687 100644 --- a/integration-tests/worker/src/init.ts +++ b/integration-tests/worker/src/init.ts @@ -5,6 +5,7 @@ import createLightningServer, { toBase64 } from '@openfn/lightning-mock'; import createEngine from '@openfn/engine-multi'; import createWorkerServer from '@openfn/ws-worker'; import { createMockLogger } from '@openfn/logger'; +// import createLogger from '@openfn/logger'; export const randomPort = () => Math.round(2000 + Math.random() * 1000); @@ -43,6 +44,7 @@ export const initWorker = async ( port: workerPort, lightning: `ws://localhost:${lightningPort}/worker`, secret: crypto.randomUUID(), + collectionsVersion: '1.0.0', ...workerArgs, }); diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index 57274819a..2d29e8ea9 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -21,7 +21,10 @@ test.before(async () => { maxWorkers: 1, repoDir: path.resolve('tmp/repo/integration'), }; - const workerArgs = { runPublicKey: keys.public }; + const workerArgs = { + runPublicKey: keys.public, + monorepoDir: path.resolve('monorepo'), + }; ({ worker, engine, engineLogger } = await initWorker( lightningPort, @@ -599,6 +602,27 @@ test.serial('Include timestamps on basically everything', (t) => { }); }); +test.serial('use local adaptor versions from monorepo', (t) => { + return new Promise(async (done) => { + lightning.once('run:complete', (evt) => { + const result = lightning.getResult('a1'); + t.deepEqual(result, { data: 42 }); + done(); + }); + + lightning.enqueueRun({ + id: 'a1', + jobs: [ + { + id: 'j1', + body: 'fortyTwo()', + adaptor: '@openfn/language-common@local', + }, + ], + }); + }); +}); + test.serial("Don't send adaptor logs to stdout", (t) => { return new Promise(async (done) => { // We have to create a new worker with a different repo for this one diff --git a/integration-tests/worker/test/runs.test.ts b/integration-tests/worker/test/runs.test.ts index 37caf94c8..7c15b29df 100644 --- a/integration-tests/worker/test/runs.test.ts +++ b/integration-tests/worker/test/runs.test.ts @@ -25,6 +25,7 @@ test.before(async () => { repoDir: path.resolve('tmp/repo/attempts'), }, { + collectionsVersion: '1.0.0-next-f802225c', runPublicKey: keys.public, } )); @@ -49,8 +50,8 @@ const run = async (t, attempt) => { // TODO friendlier job names for this would be nice (rather than run ids) t.log( `run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb( - payload.mem.job - )} / ${humanMb(payload.mem.system)}mb] [thread ${payload.thread_id}]` + payload.mem?.job + )} / ${humanMb(payload.mem?.system)}mb] [thread ${payload.thread_id}]` ); }); lightning.on('run:complete', (evt) => { @@ -248,3 +249,36 @@ test.serial('use different versions of the same adaptor', async (t) => { t.log(result); t.falsy(result.errors); }); + +test.serial('Run with collections', async (t) => { + const job1 = createJob({ + body: `fn((state = {}) => { + const server = collections.createMockServer(); + collections.setMockClient(server); + + server.api.createCollection('collection'); + + state.data = [{ id: 'a' }, { id: 'b' }, { id: 'c' }]; + state.results = []; + return state; + }); + + collections.set('collection', v => v.id, $.data); + + collections.each('collection', '*', (state, value, key) => { + state.results.push({ key, value }) + }); + `, + // Note: for some reason 1.7.0 fails because it exports a collections ?? + // 1.7.4 seems fine + adaptor: '@openfn/language-common@1.7.4', + }); + const attempt = createRun([], [job1], []); + + const { results } = await run(t, attempt); + t.deepEqual(results, [ + { key: 'a', value: { id: 'a' } }, + { key: 'b', value: { id: 'b' } }, + { key: 'c', value: { id: 'c' } }, + ]); +}); diff --git a/integration-tests/worker/test/server.test.ts b/integration-tests/worker/test/server.test.ts index 4c22acba1..9d403a406 100644 --- a/integration-tests/worker/test/server.test.ts +++ b/integration-tests/worker/test/server.test.ts @@ -21,6 +21,7 @@ const spawnServer = (port: string | number = 1, args: string[] = []) => { '--backoff 0.001/0.01', '--log debug', '-s secretsquirrel', + '--collections-version=1.0.0', ...args, ], options diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 2ddb6377b..f11db6ae4 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,16 @@ # @openfn/cli +## 1.8.6 + +### Patch Changes + +- 528e9a0: Support multiple adaptors +- Updated dependencies [3463ff9] +- Updated dependencies [7a85894] +- Updated dependencies [b6de2c4] + - @openfn/runtime@1.5.0 + - @openfn/compiler@0.4.0 + ## 1.8.5 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 996d5250c..742a1b439 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.8.5", + "version": "1.8.6", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/cli/src/compile/compile.ts b/packages/cli/src/compile/compile.ts index a2f8285f6..413a7057e 100644 --- a/packages/cli/src/compile/compile.ts +++ b/packages/cli/src/compile/compile.ts @@ -56,10 +56,8 @@ const compileWorkflow = async ( const job = step as Job; const jobOpts = { ...opts, + adaptors: job.adaptors ?? opts.adaptors, }; - if (job.adaptor) { - jobOpts.adaptors = [job.adaptor]; - } if (job.expression) { job.expression = await compileJob( job.expression as string, @@ -115,37 +113,37 @@ export const loadTransformOptions = async ( // If an adaptor is passed in, we need to look up its declared exports // and pass them along to the compiler if (opts.adaptors?.length && opts.ignoreImports != true) { - let exports; - const [pattern] = opts.adaptors; - const [specifier] = pattern.split('='); - - // Preload exports from a path, optionally logging errors in case of a failure - log.debug(`Trying to preload types for ${specifier}`); - const path = await resolveSpecifierPath(pattern, opts.repoDir, log); - if (path) { - try { - exports = await preloadAdaptorExports( - path, - opts.useAdaptorsMonorepo, - log - ); - } catch (e) { - log.error(`Failed to load adaptor typedefs from path ${path}`); - log.error(e); + const adaptorsConfig = []; + for (const adaptorInput of opts.adaptors) { + let exports; + const [specifier] = adaptorInput.split('='); + + // Preload exports from a path, optionally logging errors in case of a failure + log.debug(`Trying to preload types for ${specifier}`); + const path = await resolveSpecifierPath(adaptorInput, opts.repoDir, log); + if (path) { + try { + exports = await preloadAdaptorExports(path, log); + } catch (e) { + log.error(`Failed to load adaptor typedefs from path ${path}`); + log.error(e); + } } - } - if (!exports || exports.length === 0) { - log.debug(`No module exports found for ${pattern}`); - } + if (!exports || exports.length === 0) { + log.debug(`No module exports found for ${adaptorInput}`); + } - options['add-imports'] = { - ignore: opts.ignoreImports as string[], - adaptor: { + adaptorsConfig.push({ name: stripVersionSpecifier(specifier), exports, exportAll: true, - }, + }); + } + + options['add-imports'] = { + ignore: opts.ignoreImports as string[], + adaptors: adaptorsConfig, }; } diff --git a/packages/cli/src/execute/execute.ts b/packages/cli/src/execute/execute.ts index 62286fdb8..a01cf4d8a 100644 --- a/packages/cli/src/execute/execute.ts +++ b/packages/cli/src/execute/execute.ts @@ -67,14 +67,16 @@ export function parseAdaptors(plan: ExecutionPlan) { const adaptors: ModuleInfoMap = {}; - // TODO what if there are different versions of the same adaptor? - // This structure can't handle it - we'd need to build it for every job Object.values(plan.workflow.steps).forEach((step) => { const job = step as Job; - if (job.adaptor) { - const { name, ...maybeVersionAndPath } = extractInfo(job.adaptor); + // Usually every job should have an adaptors array + // But there are a couple of cases mostly in test, when validation is skipped, + // when the array may not be set + // It's mostly redundant nbut harmless to optionally chain here + job.adaptors?.forEach((adaptor) => { + const { name, ...maybeVersionAndPath } = extractInfo(adaptor); adaptors[name] = maybeVersionAndPath; - } + }); }); return adaptors; diff --git a/packages/cli/src/execute/get-autoinstall-targets.ts b/packages/cli/src/execute/get-autoinstall-targets.ts index 677f41f50..aea294366 100644 --- a/packages/cli/src/execute/get-autoinstall-targets.ts +++ b/packages/cli/src/execute/get-autoinstall-targets.ts @@ -5,9 +5,11 @@ const getAutoinstallTargets = (plan: ExecutionPlan) => { Object.values(plan.workflow.steps).forEach((step) => { const job = step as Job; // Do not autoinstall adaptors with a path - if (job.adaptor && !/=/.test(job.adaptor)) { - adaptors[job.adaptor] = true; - } + job.adaptors + ?.filter((adaptor) => !/=/.test(adaptor)) + .forEach((adaptor) => { + adaptors[adaptor] = true; + }); }); return Object.keys(adaptors); }; diff --git a/packages/cli/src/execute/handler.ts b/packages/cli/src/execute/handler.ts index 892d8f9cc..294e3c1c9 100644 --- a/packages/cli/src/execute/handler.ts +++ b/packages/cli/src/execute/handler.ts @@ -85,7 +85,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => { if (options.start) { customStart = matchStep( plan, - options.start ?? plan.options.start, + options.start ?? plan.options!.start, 'start', logger ); @@ -95,7 +95,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => { if (options.end) { customEnd = matchStep( plan, - options.end ?? plan.options.end, + options.end ?? plan.options!.end, 'end', logger ); @@ -113,8 +113,8 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => { const finalPlan = { ...plan, options: { - ...plan.options, - start: customStart || plan.options.start, + ...plan.options!, + start: customStart || plan.options!.start, end: customEnd, }, workflow: plan.workflow, diff --git a/packages/cli/src/types.ts b/packages/cli/src/types.ts index ed27ef8bc..ba0b3b8cf 100644 --- a/packages/cli/src/types.ts +++ b/packages/cli/src/types.ts @@ -1,4 +1,7 @@ // the executionPLan for the CLI is a bit differnt to the runtime's perspective + +import { Trigger, UUID, WorkflowOptions } from '@openfn/lexicon'; + // Ie config can be a string export type JobNodeID = string; @@ -9,19 +12,26 @@ export type OldCLIWorkflow = { }; export type CLIExecutionPlan = { - id?: string; // UUID for this plan - start?: JobNodeID; - jobs: CLIJobNode[]; + id?: string; + options?: WorkflowOptions; + workflow: { + id?: UUID; + name?: string; + steps: Array; + }; }; export type CLIJobNode = { id?: string; - adaptor?: string; expression?: string; // path or expression configuration?: string | object; // path or credential object data?: any; - next?: string | Record; + + // We can accept a single adaptor or multiple + // The CLI will convert it to adaptors as an array + adaptor?: string; + adaptors?: string[]; }; export type CLIJobEdge = { diff --git a/packages/cli/src/util/expand-adaptors.ts b/packages/cli/src/util/expand-adaptors.ts index 45b952e9d..3ccd72fd1 100644 --- a/packages/cli/src/util/expand-adaptors.ts +++ b/packages/cli/src/util/expand-adaptors.ts @@ -14,8 +14,6 @@ const expand = (name: string) => { type ArrayOrPlan = T extends string[] ? string[] : ExecutionPlan; -// TODO typings here aren't good,I can't get this to work! -// At least this looks nice externally export default | ExecutionPlan>( input: T ): ArrayOrPlan => { @@ -26,8 +24,8 @@ export default | ExecutionPlan>( const plan = input as ExecutionPlan; Object.values(plan.workflow.steps).forEach((step) => { const job = step as Job; - if (job.adaptor) { - job.adaptor = expand(job.adaptor); + if (job.adaptors) { + job.adaptors = job.adaptors.map(expand); } }); diff --git a/packages/cli/src/util/load-plan.ts b/packages/cli/src/util/load-plan.ts index 4b071d3fb..ed12eb8f4 100644 --- a/packages/cli/src/util/load-plan.ts +++ b/packages/cli/src/util/load-plan.ts @@ -8,7 +8,7 @@ import mapAdaptorsToMonorepo from './map-adaptors-to-monorepo'; import type { ExecutionPlan, Job, WorkflowOptions } from '@openfn/lexicon'; import type { Opts } from '../options'; import type { Logger } from './logger'; -import type { OldCLIWorkflow } from '../types'; +import type { CLIExecutionPlan, CLIJobNode, OldCLIWorkflow } from '../types'; const loadPlan = async ( options: Pick< @@ -36,6 +36,7 @@ const loadPlan = async ( const json = await loadJson(jsonPath!, logger); const defaultName = path.parse(jsonPath!).name; + if (json.workflow) { return loadXPlan(json, options, logger, defaultName); } else { @@ -94,21 +95,17 @@ const loadExpression = async ( const expression = await fs.readFile(expressionPath, 'utf8'); const name = path.parse(expressionPath).name; - const step: Job = { expression }; - - // The adaptor should have been expanded nicely already, so we don't need intervene here - if (options.adaptors) { - const [adaptor] = options.adaptors; - if (adaptor) { - step.adaptor = adaptor; - } - } + const step: Job = { + expression, + // The adaptor should have been expanded nicely already, so we don't need intervene here + adaptors: options.adaptors ?? [], + }; const wfOptions: WorkflowOptions = {}; // TODO support state props to remove? maybeAssign(options, wfOptions, ['timeout']); - const plan: ExecutionPlan = { + const plan: CLIExecutionPlan = { workflow: { name, steps: [step], @@ -126,7 +123,7 @@ const loadExpression = async ( ); // This will never execute - return {} as ExecutionPlan; + return {} as CLIExecutionPlan; } }; @@ -188,7 +185,7 @@ const fetchFile = async ( // TODO this is currently untested in load-plan // (but covered a bit in execute tests) const importExpressions = async ( - plan: ExecutionPlan, + plan: CLIExecutionPlan, rootDir: string, log: Logger ) => { @@ -234,8 +231,22 @@ const importExpressions = async ( } }; +// Allow users to specify a single adaptor on a job, +// but convert the internal representation into an array +const ensureAdaptors = (plan: CLIExecutionPlan) => { + Object.values(plan.workflow.steps).forEach((step) => { + const job = step as CLIJobNode; + if (job.adaptor) { + job.adaptors = [job.adaptor]; + delete job.adaptor; + } + // Also, ensure there is an empty adaptors array, which makes everything else easier + job.adaptors ??= []; + }); +}; + const loadXPlan = async ( - plan: ExecutionPlan, + plan: CLIExecutionPlan, options: Pick, logger: Logger, defaultName: string = '' @@ -247,6 +258,8 @@ const loadXPlan = async ( if (!plan.workflow.name && defaultName) { plan.workflow.name = defaultName; } + ensureAdaptors(plan); + // Note that baseDir should be set up in the default function await importExpressions(plan, options.baseDir!, logger); // expand shorthand adaptors in the workflow jobs @@ -261,5 +274,5 @@ const loadXPlan = async ( logger.info(`Loaded workflow ${plan.workflow.name ?? ''}`); - return plan; + return plan as ExecutionPlan; }; diff --git a/packages/cli/src/util/map-adaptors-to-monorepo.ts b/packages/cli/src/util/map-adaptors-to-monorepo.ts index 4e25d9876..8154a124f 100644 --- a/packages/cli/src/util/map-adaptors-to-monorepo.ts +++ b/packages/cli/src/util/map-adaptors-to-monorepo.ts @@ -58,8 +58,10 @@ const mapAdaptorsToMonorepo = ( const plan = input as ExecutionPlan; Object.values(plan.workflow.steps).forEach((step) => { const job = step as Job; - if (job.adaptor) { - job.adaptor = updatePath(job.adaptor, monorepoPath, log); + if (job.adaptors) { + job.adaptors = job.adaptors.map((a) => + updatePath(a, monorepoPath, log) + ); } }); diff --git a/packages/cli/src/util/print-versions.ts b/packages/cli/src/util/print-versions.ts index 7532a7ff8..d82075ccb 100644 --- a/packages/cli/src/util/print-versions.ts +++ b/packages/cli/src/util/print-versions.ts @@ -30,14 +30,13 @@ const printVersions = async ( includeComponents = false ) => { const { adaptors, logJson } = options; - let adaptor = ''; - if (adaptors && adaptors.length) { - adaptor = adaptors[0]; - } - let adaptorVersion; - let adaptorName = ''; - if (adaptor) { + let longestAdaptorName = ''; + const adaptorList: Array[] = []; + + adaptors?.forEach((adaptor) => { + let adaptorVersion; + let adaptorName = ''; if (adaptor.match('=')) { const [namePart, pathPart] = adaptor.split('='); adaptorVersion = loadVersionFromPath(pathPart); @@ -50,11 +49,15 @@ const printVersions = async ( adaptorName = name; adaptorVersion = version || 'latest'; } - } + adaptorList.push([adaptorName, adaptorVersion]); + if (adaptorName.length > longestAdaptorName.length) { + longestAdaptorName = adaptorName; + } + }); // Work out the longest label const longest = Math.max( - ...[NODE, CLI, RUNTIME, COMPILER, adaptorName].map((s) => s.length) + ...[NODE, CLI, RUNTIME, COMPILER, longestAdaptorName].map((s) => s.length) ); // Prefix and pad version numbers @@ -83,8 +86,10 @@ const printVersions = async ( output.versions.runtime = runtimeVersion; output.versions.compiler = compilerVersion; } - if (adaptorName) { - output.versions[adaptorName] = adaptorVersion; + if (adaptorList.length) { + for (const [name, version] of adaptorList) { + output.versions[name] = version; + } } } else { output = `Versions: @@ -96,8 +101,10 @@ ${prefix(CLI)}${version}`; ${prefix(COMPILER)}${compilerVersion}`; } - if (adaptorName) { - output += `\n${prefix(adaptorName)}${adaptorVersion}`; + if (adaptorList.length) { + for (const [name, version] of adaptorList) { + output += `\n${prefix(name)}${version}`; + } } } logger.always(output); diff --git a/packages/cli/src/util/validate-plan.ts b/packages/cli/src/util/validate-plan.ts index 87f6d2c49..b966e101a 100644 --- a/packages/cli/src/util/validate-plan.ts +++ b/packages/cli/src/util/validate-plan.ts @@ -1,4 +1,4 @@ -import { ExecutionPlan, Step, WorkflowOptions } from '@openfn/lexicon'; +import { ExecutionPlan, Job, Trigger, WorkflowOptions } from '@openfn/lexicon'; import { Logger } from '@openfn/logger'; const assertWorkflowStructure = (plan: ExecutionPlan, logger: Logger) => { @@ -23,13 +23,13 @@ const assertWorkflowStructure = (plan: ExecutionPlan, logger: Logger) => { assertOptionsStructure(options, logger); }; -const assertStepStructure = (step: Step, index: number) => { +const assertStepStructure = (step: Job | Trigger, index: number) => { const allowedKeys = [ 'id', 'name', 'next', 'previous', - 'adaptor', + 'adaptors', 'expression', 'state', 'configuration', @@ -42,7 +42,7 @@ const assertStepStructure = (step: Step, index: number) => { } } - if ('adaptor' in step && !('expression' in step)) { + if ((step as Job).adaptors?.length && !('expression' in step)) { throw new Error( `Step ${step.id ?? index} with an adaptor must also have an expression` ); diff --git a/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/index.js b/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/index.js index 8538c25d4..59b37327a 100644 --- a/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/index.js +++ b/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/index.js @@ -1,3 +1,4 @@ export const execute = () => () => 'execute called!'; -export const fn = (f) => (state) => f(state); +// don't use the same functions as common +export const alterState = (f) => (state) => f(state); diff --git a/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/types.d.ts b/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/types.d.ts index b53ff1e00..3a8f37201 100644 --- a/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/types.d.ts +++ b/packages/cli/test/__repo__/node_modules/@openfn/language-postgres_0.0.1/types.d.ts @@ -1,3 +1,3 @@ export declare function execute(f: Array<(state: any) => any>): number; -export declare function fn(f: (state: any) => any): any; +export declare function alterState(f: (state: any) => any): any; diff --git a/packages/cli/test/commands.test.ts b/packages/cli/test/commands.test.ts index cbf69e3e5..0476e76e0 100644 --- a/packages/cli/test/commands.test.ts +++ b/packages/cli/test/commands.test.ts @@ -511,7 +511,7 @@ test.serial( 'use execute from language-postgres: openfn job.js -a @openfn/language-postgres', async (t) => { const job = - 'fn((state) => { /* function isn\t actually called by the mock adaptor */ throw new Error("fake adaptor") });'; + 'alterState((state) => { /* function isn\t actually called by the mock adaptor */ throw new Error("fake adaptor") });'; const result = await run( 'openfn -a @openfn/language-postgres --no-autoinstall', job, diff --git a/packages/cli/test/compile/compile.test.ts b/packages/cli/test/compile/compile.test.ts index 46f6520d1..a4b1dcb5d 100644 --- a/packages/cli/test/compile/compile.test.ts +++ b/packages/cli/test/compile/compile.test.ts @@ -20,10 +20,10 @@ const expressionPath = '/job.js'; type TransformOptionsWithImports = { ['add-imports']: { ignore: true | string[]; - adaptor: { + adaptors: Array<{ name: string; exports: string[]; - }; + }>; }; }; @@ -234,7 +234,7 @@ test.serial( t.truthy(result['add-imports']); // Should describe the exports of the times-two module - const { name, exports } = result['add-imports'].adaptor; + const [{ name, exports }] = result['add-imports'].adaptors; t.assert(name === 'times-two'); t.assert(exports.includes('byTwo')); } @@ -257,7 +257,7 @@ test.serial( t.truthy(result['add-imports']); // Should describe the exports of the times-two module - const { name, exports } = result['add-imports'].adaptor; + const [{ name, exports }] = result['add-imports'].adaptors; t.assert(name === 'times-two'); t.assert(exports.includes('byTwo')); } @@ -282,7 +282,7 @@ test.serial( t.truthy(result['add-imports']); // Should describe the exports of the times-two module - const { name, exports } = result['add-imports'].adaptor; + const [{ name, exports }] = result['add-imports'].adaptors; t.assert(name === 'times-two'); t.assert(exports.includes('byTwo')); } diff --git a/packages/cli/test/execute/get-autoinstall-targets.test.ts b/packages/cli/test/execute/get-autoinstall-targets.test.ts index 33a29786b..38e525346 100644 --- a/packages/cli/test/execute/get-autoinstall-targets.test.ts +++ b/packages/cli/test/execute/get-autoinstall-targets.test.ts @@ -29,14 +29,14 @@ test('plan with zero adaptors', (t) => { t.is(result.length, 0); }); -test('plan with multiple adaptors', (t) => { +test('plan with multiple adaptors in multiple steps', (t) => { const plan = getPlan([ { - adaptor: '@openfn/language-common', + adaptors: ['@openfn/language-common'], expression: 'fn()', }, { - adaptor: '@openfn/language-http', + adaptors: ['@openfn/language-http'], expression: 'fn()', }, ]); @@ -48,11 +48,11 @@ test('plan with multiple adaptors', (t) => { test('plan with duplicate adaptors', (t) => { const plan = getPlan([ { - adaptor: '@openfn/language-common', + adaptors: ['@openfn/language-common'], expression: 'fn()', }, { - adaptor: '@openfn/language-common', + adaptors: ['@openfn/language-common'], expression: 'fn()', }, ]); @@ -61,18 +61,34 @@ test('plan with duplicate adaptors', (t) => { t.deepEqual(result, ['@openfn/language-common']); }); +test('plan with multiple adaptors in one step with duplicates', (t) => { + const plan = getPlan([ + { + adaptors: [ + '@openfn/language-common', + '@openfn/language-http', + '@openfn/language-http', + ], + expression: 'fn()', + }, + ]); + const result = getAutoinstallTargets(plan); + t.is(result.length, 2); + t.deepEqual(result, ['@openfn/language-common', '@openfn/language-http']); +}); + test('plan with one adaptor but different versions', (t) => { const plan = getPlan([ { - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], expression: 'fn()', }, { - adaptor: '@openfn/language-common@2.0.0', + adaptors: ['@openfn/language-common@2.0.0'], expression: 'fn()', }, { - adaptor: '@openfn/language-common@3.0.0', + adaptors: ['@openfn/language-common@3.0.0'], expression: 'fn()', }, ]); @@ -89,7 +105,7 @@ test('do not return adaptors with a path', (t) => { const plan = getPlan([ { expression: 'fn()', - adaptor: 'common=a/b/c', + adaptors: ['common=a/b/c'], }, ]); const result = getAutoinstallTargets(plan); diff --git a/packages/cli/test/execute/parse-adaptors.test.ts b/packages/cli/test/execute/parse-adaptors.test.ts index cdbdf6753..8bbb0efdd 100644 --- a/packages/cli/test/execute/parse-adaptors.test.ts +++ b/packages/cli/test/execute/parse-adaptors.test.ts @@ -7,7 +7,7 @@ const createPlan = (adaptor: string): ExecutionPlan => ({ workflow: { steps: [ { - adaptor, + adaptors: [adaptor], expression: '.', }, ], @@ -68,22 +68,22 @@ test('parse plan with several steps', (t) => { workflow: { steps: [ { - adaptor: '@openfn/language-common', + adaptors: ['@openfn/language-common'], expression: 'fn()', }, { - adaptor: '@openfn/language-http@1.0.0', + adaptors: ['@openfn/language-http@1.0.0'], expression: 'fn()', }, { - adaptor: '@openfn/language-salesforce=a/b/c', + adaptors: ['@openfn/language-salesforce=a/b/c'], expression: 'fn()', }, ], }, }; const result = parseAdaptors(plan); - t.assert(Object.keys(result).length === 3); + t.is(Object.keys(result).length, 3); t.deepEqual(result, { '@openfn/language-common': {}, '@openfn/language-http': { @@ -94,22 +94,3 @@ test('parse plan with several steps', (t) => { }, }); }); - -// TODO we can't do this right now -// We'd have to send different maps to different jobs -// Which we can support but maybe I'm gonna push that out of scope -test.skip('parse workflow with multiple versions of the same adaptor', (t) => { - const workflow = { - start: 'a', - jobs: { - a: { - adaptor: '@openfn/language-common@1.0.0', - expression: 'fn()', - }, - b: { - adaptor: '@openfn/language-common@2.0.0', - expression: 'fn()', - }, - }, - }; -}); diff --git a/packages/cli/test/util/expand-adaptors.test.ts b/packages/cli/test/util/expand-adaptors.test.ts index 23f1a006d..023e70ca6 100644 --- a/packages/cli/test/util/expand-adaptors.test.ts +++ b/packages/cli/test/util/expand-adaptors.test.ts @@ -61,22 +61,22 @@ test('expands adaptors in an execution plan', (t) => { steps: [ { id: 'a', - adaptor: 'common', + adaptors: ['common'], expression: 'fn()', }, { id: 'b', - adaptor: 'http@1.0.0', + adaptors: ['http@1.0.0'], expression: 'fn()', }, { id: 'c', - adaptor: 'salesforce=a/b/c', + adaptors: ['salesforce=a/b/c'], expression: 'fn()', }, { id: 'd', - adaptor: 'a/b/c/my-adaptor.js', + adaptors: ['a/b/c/my-adaptor.js'], expression: 'fn()', }, ], @@ -85,8 +85,8 @@ test('expands adaptors in an execution plan', (t) => { }; expandAdaptors(plan); const [a, b, c, d] = plan.workflow.steps; - t.is(a.adaptor, '@openfn/language-common'); - t.is(b.adaptor, '@openfn/language-http@1.0.0'); - t.is(c.adaptor, '@openfn/language-salesforce=a/b/c'); - t.is(d.adaptor, 'a/b/c/my-adaptor.js'); + t.is(a.adaptors[0], '@openfn/language-common'); + t.is(b.adaptors[0], '@openfn/language-http@1.0.0'); + t.is(c.adaptors[0], '@openfn/language-salesforce=a/b/c'); + t.is(d.adaptors[0], 'a/b/c/my-adaptor.js'); }); diff --git a/packages/cli/test/util/load-plan.test.ts b/packages/cli/test/util/load-plan.test.ts index e2e296e3c..6a44c6a5a 100644 --- a/packages/cli/test/util/load-plan.test.ts +++ b/packages/cli/test/util/load-plan.test.ts @@ -1,7 +1,7 @@ import test from 'ava'; import mock from 'mock-fs'; import { createMockLogger } from '@openfn/logger'; -import type { Job } from '@openfn/lexicon'; +import type { Job, LegacyJob } from '@openfn/lexicon'; import loadPlan from '../../src/util/load-plan'; import { Opts } from '../../src/options'; @@ -12,11 +12,11 @@ const sampleXPlan = { options: { start: 'a' }, workflow: { name: 'wf', - steps: [{ id: 'a', expression: 'x()' }], + steps: [{ id: 'a', expression: 'x()', adaptors: [] }], }, }; -const createPlan = (steps: Job[] = []) => ({ +const createPlan = (steps: Partial[] = []) => ({ workflow: { steps, }, @@ -56,6 +56,7 @@ test.serial('expression: load a plan from an expression.js', async (t) => { t.is(plan.workflow.name, 'job'); t.deepEqual(plan.workflow.steps[0], { expression: 'x', + adaptors: [], }); }); @@ -70,7 +71,7 @@ test.serial('expression: set an adaptor on the plan', async (t) => { const step = plan.workflow.steps[0] as Job; - t.is(step.adaptor, '@openfn/language-common'); + t.is(step.adaptors[0], '@openfn/language-common'); }); test.serial('expression: do not expand adaptors', async (t) => { @@ -85,7 +86,7 @@ test.serial('expression: do not expand adaptors', async (t) => { const step = plan.workflow.steps[0] as Job; - t.is(step.adaptor, 'common'); + t.is(step.adaptors[0], 'common'); }); test.serial('expression: set a timeout on the plan', async (t) => { @@ -147,7 +148,9 @@ test.serial('xplan: expand adaptors', async (t) => { t.truthy(result); const step = result.workflow.steps[0] as Job; - t.is(step.adaptor, '@openfn/language-common@1.0.0'); + t.is(step.adaptors[0], '@openfn/language-common@1.0.0'); + // @ts-ignore + t.is(step.adaptor, undefined); }); test.serial('xplan: do not expand adaptors', async (t) => { @@ -173,7 +176,9 @@ test.serial('xplan: do not expand adaptors', async (t) => { t.truthy(result); const step = result.workflow.steps[0] as Job; - t.is(step.adaptor, 'common@1.0.0'); + t.is(step.adaptors[0], 'common@1.0.0'); + // @ts-ignore + t.is(step.adaptor, undefined); }); test.serial('xplan: set timeout from CLI', async (t) => { @@ -250,7 +255,7 @@ test.serial('xplan: map to monorepo', async (t) => { t.truthy(result); const step = result.workflow.steps[0] as Job; - t.is(step.adaptor, '@openfn/language-common=/repo/packages/common'); + t.is(step.adaptors[0], '@openfn/language-common=/repo/packages/common'); }); test.serial('old-workflow: load a plan from workflow path', async (t) => { @@ -269,6 +274,7 @@ test.serial('old-workflow: load a plan from workflow path', async (t) => { t.deepEqual(plan.workflow.steps[0], { id: 'a', expression: 'x()', + adaptors: [], }); }); @@ -289,8 +295,8 @@ test.serial('step: allow file paths for state', async (t) => { mock({ 'test/state.json': JSON.stringify({ data: { - x: 1 - } + x: 1, + }, }), 'test/wf.json': JSON.stringify(plan), }); @@ -300,7 +306,38 @@ test.serial('step: allow file paths for state', async (t) => { const step = result.workflow.steps[0] as Job; t.deepEqual(step.state, { data: { - x: 1 - } + x: 1, + }, + }); +}); + +test.serial('xplan: support multiple adaptors', async (t) => { + const opts = { + workflowPath: 'test/wf.json', + expandAdaptors: true, + plan: {}, + }; + + const plan = createPlan([ + { + id: 'a', + expression: '.', + adaptors: ['common@1.0.0', '@openfn/language-collections@1.0.0'], + }, + ]); + + mock({ + 'test/wf.json': JSON.stringify(plan), }); + + const result = await loadPlan(opts, logger); + t.truthy(result); + + const step = result.workflow.steps[0] as Job; + t.deepEqual(step.adaptors, [ + '@openfn/language-common@1.0.0', + '@openfn/language-collections@1.0.0', + ]); + // @ts-ignore + t.is(step.adaptor, undefined); }); diff --git a/packages/cli/test/util/map-adaptors-to-monorepo.test.ts b/packages/cli/test/util/map-adaptors-to-monorepo.test.ts index 3c6dd9a7d..97f71e60a 100644 --- a/packages/cli/test/util/map-adaptors-to-monorepo.test.ts +++ b/packages/cli/test/util/map-adaptors-to-monorepo.test.ts @@ -87,7 +87,7 @@ test.serial('mapAdaptorsToMonorepo: map workflow', async (t) => { steps: [ { expression: '.', - adaptor: 'common', + adaptors: ['common'], }, ], }, @@ -99,7 +99,7 @@ test.serial('mapAdaptorsToMonorepo: map workflow', async (t) => { steps: [ { expression: '.', - adaptor: `common=${ABS_REPO_PATH}/packages/common`, + adaptors: [`common=${ABS_REPO_PATH}/packages/common`], }, ], }); diff --git a/packages/cli/test/util/validate-plan.test.ts b/packages/cli/test/util/validate-plan.test.ts index 9d78b2785..81dad64a4 100644 --- a/packages/cli/test/util/validate-plan.test.ts +++ b/packages/cli/test/util/validate-plan.test.ts @@ -6,108 +6,110 @@ import validate from '../../src/util/validate-plan'; const logger = createMockLogger('', { level: 'debug' }); test.afterEach(() => { - logger._reset(); -}) + logger._reset(); +}); test('throws for missing workflow', (t) => { - const plan = { - options: { - start: 'a', - } - } as ExecutionPlan; + const plan = { + options: { + start: 'a', + }, + } as ExecutionPlan; - t.throws(() => validate(plan, logger), { - message: `Missing or invalid "workflow" key in execution plan`, - }); + t.throws(() => validate(plan, logger), { + message: `Missing or invalid "workflow" key in execution plan`, + }); }); test('throws for steps not an array', (t) => { + const plan = { + options: { + start: 'a', + }, + workflow: { + steps: { + id: 'a', + }, + }, + } as unknown as ExecutionPlan; - const plan = { - options: { - start: 'a', - }, - workflow: { - steps: { - id: 'a' - } - }, - } as unknown as ExecutionPlan; - - t.throws(() => validate(plan, logger), { - message: 'The workflow.steps key must be an array', - }); + t.throws(() => validate(plan, logger), { + message: 'The workflow.steps key must be an array', + }); }); test('throws for a step with an adaptor but no expression', (t) => { - const plan = { - options: { - start: 'a', - }, - workflow: { - steps: [ - { - id: 'a', - adaptor: 'z' - } - ], + const plan = { + options: { + start: 'a', + }, + workflow: { + steps: [ + { + id: 'a', + adaptors: ['z'], }, - } as unknown as ExecutionPlan; + ], + }, + } as unknown as ExecutionPlan; - t.throws(() => validate(plan, logger), { - message: 'Step a with an adaptor must also have an expression', - }); + t.throws(() => validate(plan, logger), { + message: 'Step a with an adaptor must also have an expression', + }); }); test('throws for unknown key in a step', (t) => { - const plan = { - options: { - start: 'a', - }, - workflow: { - steps: [ - { - id: 'a', - key: 'z' - } - ], + const plan = { + options: { + start: 'a', + }, + workflow: { + steps: [ + { + id: 'a', + key: 'z', }, - }as unknown as ExecutionPlan; + ], + }, + } as unknown as ExecutionPlan; - t.throws(() => validate(plan, logger), { - message: 'Invalid key "key" in step a', - }); + t.throws(() => validate(plan, logger), { + message: 'Invalid key "key" in step a', + }); }); test.serial('should warn if no steps are defined', (t) => { - const plan: ExecutionPlan = { - options: { - start: 'a', - }, - workflow: { - steps: [], - }, - }; - validate(plan, logger); - const { message, level } = logger._parse(logger._history[0]); - t.is(level, 'warn'); - t.regex(message as string, /The workflow.steps array is empty/); -}) + const plan: ExecutionPlan = { + options: { + start: 'a', + }, + workflow: { + steps: [], + }, + }; + validate(plan, logger); + const { message, level } = logger._parse(logger._history[0]); + t.is(level, 'warn'); + t.regex(message as string, /The workflow.steps array is empty/); +}); test.serial('should warn if unknown key is passed in options', (t) => { - const plan = { - options: { - start: 'a', - key: 'z', + const plan = { + options: { + start: 'a', + key: 'z', + }, + workflow: { + steps: [ + { + id: 'a', + adaptors: [], }, - workflow: { - steps: [{ - id: 'a', - }], - }, - } as unknown as ExecutionPlan; - validate(plan, logger); - const { message, level } = logger._parse(logger._history[0]); - t.is(level, 'warn'); - t.regex(message as string, /Unrecognized option "key" in options object/); -}) + ], + }, + } as unknown as ExecutionPlan; + validate(plan, logger); + const { message, level } = logger._parse(logger._history[0]); + t.is(level, 'warn'); + t.regex(message as string, /Unrecognized option "key" in options object/); +}); diff --git a/packages/compiler/CHANGELOG.md b/packages/compiler/CHANGELOG.md index 442c9d38a..1be5521e3 100644 --- a/packages/compiler/CHANGELOG.md +++ b/packages/compiler/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/compiler +## 0.4.0 + +### Minor Changes + +- 7a85894: support multiple adaptors when adding imports +- b6de2c4: Be more accurate in calculating exports from an adaptor + ## 0.3.3 ### Patch Changes diff --git a/packages/compiler/package.json b/packages/compiler/package.json index 2e0cd5131..a0fba4a7f 100644 --- a/packages/compiler/package.json +++ b/packages/compiler/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/compiler", - "version": "0.3.3", + "version": "0.4.0", "description": "Compiler and language tooling for openfn jobs.", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/compiler/src/transforms/add-imports.ts b/packages/compiler/src/transforms/add-imports.ts index d6c9ae36c..4828388ec 100644 --- a/packages/compiler/src/transforms/add-imports.ts +++ b/packages/compiler/src/transforms/add-imports.ts @@ -94,11 +94,11 @@ const globalRE = new RegExp(`^${globals.join('|')}$`); export type AddImportsOptions = { ignore?: string[]; // Adaptor MUST be pre-populated for this transformer to actually do anything - adaptor: { + adaptors: Array<{ name: string; exports?: string[]; exportAll?: boolean; - }; + }>; }; export type IdentifierList = Record; @@ -157,31 +157,34 @@ export function findAllDanglingIdentifiers(ast: ASTNode) { } function visitor(path: NodePath, logger: Logger, options: AddImportsOptions) { - if (options.adaptor) { - const { name, exports, exportAll } = options.adaptor; - const ignore = - options.ignore?.reduce((obj, key) => { - obj[key] = true; - return obj; - }, {} as Record) ?? {}; + if (options.adaptors) { + const identifiers = findAllDanglingIdentifiers(path.node); - if (name) { - const identifiers = findAllDanglingIdentifiers(path.node); - const usedExports = - exports && exports.length - ? // If we have exports for this adaptor, import any dangling variables from the export list - exports.filter((e) => !ignore[e] && identifiers[e]) - : // If we have no exports for this adaptor, import anything apart from a few choice globals - Object.keys(identifiers).filter( - (i) => !ignore[i] && !globalRE.test(i) - ); - if (usedExports.length) { - // TODO maybe in trace output we can say WHY we're doing these things - addUsedImports(path, usedExports, name); - logger.info(`Added import statement for ${name}`); - if (exportAll) { - addExportAdaptor(path, name); - logger.info(`Added export * statement for ${name}`); + for (const adaptor in options.adaptors) { + const { name, exports, exportAll } = options.adaptors[adaptor]; + const ignore = + options.ignore?.reduce((obj, key) => { + obj[key] = true; + return obj; + }, {} as Record) ?? {}; + + if (name) { + const usedExports = + exports && exports.length + ? // If we have exports for this adaptor, import any dangling variables from the export list + exports.filter((e) => !ignore[e] && identifiers[e]) + : // If we have no exports for this adaptor, import anything apart from a few choice globals + Object.keys(identifiers).filter( + (i) => !ignore[i] && !globalRE.test(i) + ); + if (usedExports.length) { + // TODO maybe in trace output we can say WHY we're doing these things + addUsedImports(path, usedExports, name); + logger.info(`Added import statement for ${name}`); + if (exportAll) { + addExportAdaptor(path, name); + logger.info(`Added export * statement for ${name}`); + } } } } diff --git a/packages/compiler/src/util.ts b/packages/compiler/src/util.ts index 0f70df991..ee0b0c92d 100644 --- a/packages/compiler/src/util.ts +++ b/packages/compiler/src/util.ts @@ -1,5 +1,5 @@ import { readFileSync } from 'node:fs'; -import { readFile, readdir } from 'node:fs/promises'; +import { readFile } from 'node:fs/promises'; import path from 'node:path'; import { Project, describeDts } from '@openfn/describe-package'; import type { Logger } from '@openfn/logger'; @@ -24,7 +24,6 @@ export const isRelativeSpecifier = (specifier: string) => // But we may relax this later. export const preloadAdaptorExports = async ( pathToModule: string, - useMonorepo?: boolean, log?: Logger ) => { const project = new Project(); @@ -37,31 +36,11 @@ export const preloadAdaptorExports = async ( if (pkg.types) { const functionDefs = {} as Record; - // load common definitions into the project - if (pkg.name !== '@openfn/language-common') { - try { - const common = await findExports( - path.resolve( - pathToModule, - useMonorepo ? '../common' : '../language-common' - ), - 'types/index.d.ts', - project - ); - if (common) { - common.forEach(({ name }) => { - functionDefs[name] = true; - }); - } - } catch (e) { - log?.debug('Failed to load types from language common'); - log?.debug(e); - } - } - const adaptor = await findExports(pathToModule, pkg.types, project); adaptor.forEach(({ name }) => { - functionDefs[name] = true; + if (name !== 'default') { + functionDefs[name] = true; + } }); return Object.keys(functionDefs); @@ -82,20 +61,35 @@ const findExports = async ( types: string, project: Project ) => { - const typesRoot = path.dirname(types); - const files = await readdir(`${moduleRoot}/${typesRoot}`); - const dtsFiles = files.filter((f) => f.endsWith('.d.ts')); - const result = []; - for (const f of dtsFiles) { - const relPath = `${typesRoot}/${f}`; - const contents = await readFile(`${moduleRoot}/${relPath}`, 'utf8'); - project.createFile(contents, relPath); + const results = []; + + const contents = await readFile(`${moduleRoot}/${types}`, 'utf8'); + project.createFile(contents, types); - result.push( - ...describeDts(project, relPath, { - includePrivate: true, - }) - ); + results.push( + ...describeDts(project, types, { + includePrivate: true, + }) + ); + + // Ensure that everything in adaptor.d.ts is exported + // This is kinda cheating but it's quite safe for the time being + const typesRoot = path.dirname(types); + for (const dts of ['adaptor', 'Adaptor']) { + try { + const adaptorPath = `${moduleRoot}/${typesRoot}/${dts}.d.ts`; + const contents = await readFile(adaptorPath, 'utf8'); + project.createFile(contents, adaptorPath); + results.push( + ...describeDts(project, adaptorPath, { + includePrivate: true, + }) + ); + break; + } catch (e) { + // no problem if this throws - likely the file doesn't exist + } } - return result; + + return results; }; diff --git a/packages/compiler/test/compile.test.ts b/packages/compiler/test/compile.test.ts index 9eca1efea..abd4d220c 100644 --- a/packages/compiler/test/compile.test.ts +++ b/packages/compiler/test/compile.test.ts @@ -56,10 +56,12 @@ test('compile multiple operations', (t) => { test('add imports', (t) => { const options = { 'add-imports': { - adaptor: { - name: '@openfn/language-common', - exports: ['fn'], - }, + adaptors: [ + { + name: '@openfn/language-common', + exports: ['fn'], + }, + ], }, }; const source = 'fn();'; @@ -71,10 +73,12 @@ test('add imports', (t) => { test('do not add imports', (t) => { const options = { 'add-imports': { - adaptor: { - name: '@openfn/language-common', - exports: ['fn'], - }, + adaptors: [ + { + name: '@openfn/language-common', + exports: ['fn'], + }, + ], }, }; // This example already has the correct imports declared, so add-imports should do nothing @@ -87,9 +91,11 @@ test('do not add imports', (t) => { test('dumbly add imports', (t) => { const options = { 'add-imports': { - adaptor: { - name: '@openfn/language-common', - }, + adaptors: [ + { + name: '@openfn/language-common', + }, + ], }, }; // This example already has the correct imports declared, so add-imports should do nothing @@ -102,11 +108,13 @@ test('dumbly add imports', (t) => { test('add imports with export all', (t) => { const options = { 'add-imports': { - adaptor: { - name: '@openfn/language-common', - exports: ['fn'], - exportAll: true, - }, + adaptors: [ + { + name: '@openfn/language-common', + exports: ['fn'], + exportAll: true, + }, + ], }, }; const source = 'fn();'; @@ -154,10 +162,12 @@ test('compile a lazy state ($) expression', (t) => { test('compile a lazy state ($) expression with dumb imports', (t) => { const options = { 'add-imports': { - adaptor: { - name: '@openfn/language-common', - exportAll: true, - }, + adaptors: [ + { + name: '@openfn/language-common', + exportAll: true, + }, + ], }, }; const source = 'get($.data.endpoint);'; diff --git a/packages/compiler/test/transforms/add-imports.test.ts b/packages/compiler/test/transforms/add-imports.test.ts index 76eccffef..de657ee7d 100644 --- a/packages/compiler/test/transforms/add-imports.test.ts +++ b/packages/compiler/test/transforms/add-imports.test.ts @@ -234,10 +234,12 @@ test('add imports for a test module', async (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: exports, - }, + adaptors: [ + { + name: 'test-adaptor', + exports: exports, + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -260,10 +262,12 @@ test('only add used imports for a test module', async (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: exports, - }, + adaptors: [ + { + name: 'test-adaptor', + exports: exports, + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -285,10 +289,12 @@ test("don't add imports if nothing is used", async (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: exports, - }, + adaptors: [ + { + name: 'test-adaptor', + exports: exports, + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -296,6 +302,47 @@ test("don't add imports if nothing is used", async (t) => { t.assert(transformed.body.length === 0); }); +test('add imports for multiple adaptors', async (t) => { + const ast = b.program([ + b.expressionStatement(b.identifier('x')), + b.expressionStatement(b.identifier('y')), + ]); + + const options = { + 'add-imports': { + adaptors: [ + { + name: 'adaptor-a', + exports: ['x'], + }, + { + name: 'adaptor-b', + exports: ['y'], + }, + ], + }, + }; + const transformed = transform(ast, [addImports], options) as n.Program; + + // Note that the first is y, and the second is x + const [first, second] = transformed.body as [ + n.ImportDeclaration, + n.ImportDeclaration + ]; + t.assert(n.ImportDeclaration.check(first)); + const imports_1 = first.specifiers as n.ImportSpecifier[]; + t.assert(imports_1.length === 1); + t.assert(imports_1.find((i) => i.imported.name === 'y')); + t.is(first.source.value, 'adaptor-b'); + + t.assert(n.ImportDeclaration.check(second)); + const imports_2 = (second as n.ImportDeclaration) + .specifiers as n.ImportSpecifier[]; + t.assert(imports_2.length === 1); + t.assert(imports_2.find((i) => i.imported.name === 'x')); + t.is(second.source.value, 'adaptor-a'); +}); + test("don't import if a variable is declared with the same name", async (t) => { const ast = b.program([ b.variableDeclaration('const', [b.variableDeclarator(b.identifier('x'))]), @@ -307,10 +354,12 @@ test("don't import if a variable is declared with the same name", async (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: exports, - }, + adaptors: [ + { + name: 'test-adaptor', + exports: exports, + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -325,10 +374,12 @@ test('dumbly add imports for an adaptor with empty exports', (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: [], - }, + adaptors: [ + { + name: 'test-adaptor', + exports: [], + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -350,9 +401,11 @@ test('dumbly add imports for an adaptor with unknown exports', (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - }, + adaptors: [ + { + name: 'test-adaptor', + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -405,10 +458,12 @@ test("don't auto add imports for node globals", (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: [], - }, + adaptors: [ + { + name: 'test-adaptor', + exports: [], + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; @@ -430,10 +485,12 @@ test("Don't add imports for ignored identifiers", async (t) => { const options = { 'add-imports': { ignore: ['x'], - adaptor: { - name: 'test-adaptor', - exports: [], - }, + adaptors: [ + { + name: 'test-adaptor', + exports: [], + }, + ], }, }; @@ -460,10 +517,12 @@ test("Don't add imports from import specifiers", async (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exports: [], - }, + adaptors: [ + { + name: 'test-adaptor', + exports: [], + }, + ], }, }; @@ -480,10 +539,12 @@ test('export everything from an adaptor', (t) => { const options = { 'add-imports': { - adaptor: { - name: 'test-adaptor', - exportAll: true, - }, + adaptors: [ + { + name: 'test-adaptor', + exportAll: true, + }, + ], }, }; const transformed = transform(ast, [addImports], options) as n.Program; diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 4622515d5..c14320d5f 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,20 @@ # engine-multi +## 1.4.0 + +### Minor Changes + +- 7245bf7: Support multiple adaptors in job structures + +### Patch Changes + +- Engine: don't try to autoinstall adaptor versions with @local +- Updated dependencies [3463ff9] +- Updated dependencies [7a85894] +- Updated dependencies [b6de2c4] + - @openfn/runtime@1.5.0 + - @openfn/compiler@0.4.0 + ## 1.3.0 ### Minor Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index b61f23979..133a6af27 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.3.0", + "version": "1.4.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/autoinstall.ts b/packages/engine-multi/src/api/autoinstall.ts index 48881e813..bf4bae9a0 100644 --- a/packages/engine-multi/src/api/autoinstall.ts +++ b/packages/engine-multi/src/api/autoinstall.ts @@ -124,7 +124,7 @@ const autoinstall = async (context: ExecutionContext): Promise => { const paths: ModulePaths = {}; const adaptorsToLoad = []; - for (let a of adaptors) { + for (const a of adaptors) { // Ensure that this is not blacklisted if (whitelist && !whitelist.find((r) => r.exec(a))) { // TODO what if it is? For now we'll log and skip it @@ -154,6 +154,11 @@ const autoinstall = async (context: ExecutionContext): Promise => { (context.versions[name] as string[]).push(v); } + if (v === 'local') { + logger.info('Using local version of ', a); + continue; + } + // important: write back to paths with the RAW specifier paths[a] = { path: `${repoDir}/node_modules/${alias}`, @@ -168,12 +173,13 @@ const autoinstall = async (context: ExecutionContext): Promise => { // Write linker arguments back to the plan for (const step of plan.workflow.steps) { const job = step as unknown as Job; - if (paths[job.adaptor!]) { - const { name } = getNameAndVersion(job.adaptor!); - // @ts-ignore - job.linker = { - [name]: paths[job.adaptor!], - }; + for (const adaptor of job.adaptors ?? []) { + if (paths[adaptor!]) { + const { name } = getNameAndVersion(adaptor!); + job.linker ??= {}; + // @ts-ignore + job.linker[name] = paths[adaptor!]; + } } } @@ -232,8 +238,10 @@ const isInstalled = async ( export const identifyAdaptors = (plan: ExecutionPlan): Set => { const adaptors = new Set(); plan.workflow.steps - .filter((job) => (job as Job).adaptor) - .forEach((job) => adaptors.add((job as Job).adaptor!)); + .filter((job) => (job as Job).adaptors) + .map((job) => (job as Job).adaptors) + .flat() + .forEach((adaptor) => adaptors.add(adaptor as string)); return adaptors; }; diff --git a/packages/engine-multi/src/api/compile.ts b/packages/engine-multi/src/api/compile.ts index c47660adf..2aa0ecfa1 100644 --- a/packages/engine-multi/src/api/compile.ts +++ b/packages/engine-multi/src/api/compile.ts @@ -1,5 +1,5 @@ import compile, { preloadAdaptorExports, Options } from '@openfn/compiler'; -import { getModulePath } from '@openfn/runtime'; +import { getModulePath, getNameAndVersion } from '@openfn/runtime'; import type { Job } from '@openfn/lexicon'; import type { Logger } from '@openfn/logger'; @@ -18,12 +18,7 @@ export default async (context: ExecutionContext) => { const job = step as Job; if (job.expression) { try { - job.expression = await compileJob( - job.expression as string, - logger, - repoDir, - job.adaptor // TODO need to expand this. Or do I? - ); + job.expression = await compileJob(job, logger, repoDir); } catch (e) { throw new CompileError(e, job.id!); } @@ -43,27 +38,32 @@ const stripVersionSpecifier = (specifier: string) => { return specifier; }; -const compileJob = async ( - job: string, - logger: Logger, - repoDir?: string, - adaptor?: string -) => { +const compileJob = async (job: Job, logger: Logger, repoDir?: string) => { + const { expression, adaptors, linker } = job; const compilerOptions: Options = { logger, }; - if (adaptor && repoDir) { - // TODO I probably dont want to log this stuff - const pathToAdaptor = await getModulePath(adaptor, repoDir, logger); - const exports = await preloadAdaptorExports(pathToAdaptor!, false, logger); - compilerOptions['add-imports'] = { - adaptor: { + if (adaptors && repoDir) { + const adaptorConfig = []; + for (const adaptor of adaptors) { + const { name } = getNameAndVersion(adaptor); + + // Support local versions by looking in job.linker for a local path to the adaptor + const pathToAdaptor = + linker && linker[name] + ? linker[name].path + : await getModulePath(adaptor, repoDir, logger); + const exports = await preloadAdaptorExports(pathToAdaptor!, logger); + adaptorConfig.push({ name: stripVersionSpecifier(adaptor), exports, exportAll: true, - }, + }); + } + compilerOptions['add-imports'] = { + adaptors: adaptorConfig, }; } - return compile(job, compilerOptions); + return compile(expression, compilerOptions); }; diff --git a/packages/engine-multi/src/test/util.ts b/packages/engine-multi/src/test/util.ts index 494c24e27..93916563a 100644 --- a/packages/engine-multi/src/test/util.ts +++ b/packages/engine-multi/src/test/util.ts @@ -7,7 +7,7 @@ export const createPlan = (job = {}) => steps: [ { id: 'j1', - adaptor: 'common', // not used + adaptors: ['common'], // not used configuration: {}, // not used expression: '(s) => ({ data: { answer: s.data?.input || 42 } })', diff --git a/packages/engine-multi/test/api/autoinstall.test.ts b/packages/engine-multi/test/api/autoinstall.test.ts index ffb288093..1bced2a3a 100644 --- a/packages/engine-multi/test/api/autoinstall.test.ts +++ b/packages/engine-multi/test/api/autoinstall.test.ts @@ -48,7 +48,7 @@ const createContext = ( plan: { workflow: { steps: jobs || [ - { adaptor: '@openfn/language-common@1.0.0', expression: '.' }, + { adaptors: ['@openfn/language-common@1.0.0'], expression: '.' }, ], }, options: {}, @@ -126,15 +126,15 @@ test('identifyAdaptors: pick out adaptors and remove duplicates', (t) => { workflow: { steps: [ { - adaptor: 'common@1.0.0', + adaptors: ['common@1.0.0'], expression: '.', }, { - adaptor: 'common@1.0.0', + adaptors: ['common@1.0.0'], expression: '.', }, { - adaptor: 'common@1.0.1', + adaptors: ['common@1.0.1'], expression: '.', }, ], @@ -150,7 +150,7 @@ test('identifyAdaptors: pick out adaptors and remove duplicates', (t) => { test.serial('autoinstall: handle @latest', async (t) => { const jobs = [ { - adaptor: 'x@latest', + adaptors: ['x@latest'], }, ]; @@ -169,7 +169,7 @@ test.serial('autoinstall: handle @latest', async (t) => { test.serial('autoinstall: handle @next', async (t) => { const jobs = [ { - adaptor: 'x@next', + adaptors: ['x@next'], }, ]; @@ -269,9 +269,15 @@ test.serial('autoinstall: install in sequence', async (t) => { handleIsInstalled: false, } as any; - const c1 = createContext(options, [{ adaptor: '@openfn/language-common@1' }]); - const c2 = createContext(options, [{ adaptor: '@openfn/language-common@2' }]); - const c3 = createContext(options, [{ adaptor: '@openfn/language-common@3' }]); + const c1 = createContext(options, [ + { adaptors: ['@openfn/language-common@1'] }, + ]); + const c2 = createContext(options, [ + { adaptors: ['@openfn/language-common@2'] }, + ]); + const c3 = createContext(options, [ + { adaptors: ['@openfn/language-common@3'] }, + ]); autoinstall(c1); await wait(1); @@ -300,10 +306,10 @@ test('autoinstall: handle two seperate, non-overlapping installs', async (t) => }; const c1 = createContext(options, [ - { adaptor: '@openfn/language-dhis2@1.0.0' }, + { adaptors: ['@openfn/language-dhis2@1.0.0'] }, ]); const c2 = createContext(options, [ - { adaptor: '@openfn/language-http@1.0.0' }, + { adaptors: ['@openfn/language-http@1.0.0'] }, ]); const p1 = await autoinstall(c1); @@ -336,7 +342,7 @@ test.serial( const job = [ { - adaptor: 'lodash@1.0.0', + adaptors: ['lodash@1.0.0'], }, ]; @@ -357,10 +363,10 @@ test.serial( test.serial('autoinstall: return a map to modules', async (t) => { const jobs = [ { - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], }, { - adaptor: '@openfn/language-http@1.0.0', + adaptors: ['@openfn/language-http@1.0.0'], }, ]; @@ -388,13 +394,16 @@ test.serial('autoinstall: return a map to modules', async (t) => { test.serial('autoinstall: write linker options back to the plan', async (t) => { const jobs = [ { - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], }, { - adaptor: '@openfn/language-common@2.0.0', + adaptors: [ + '@openfn/language-common@2.0.0', + '@openfn/language-collections@1.0.0', + ], }, { - adaptor: '@openfn/language-http@1.0.0', + adaptors: ['@openfn/language-http@1.0.0'], }, ]; @@ -419,6 +428,10 @@ test.serial('autoinstall: write linker options back to the plan', async (t) => { path: 'tmp/repo/node_modules/@openfn/language-common_2.0.0', version: '2.0.0', }, + '@openfn/language-collections': { + path: 'tmp/repo/node_modules/@openfn/language-collections_1.0.0', + version: '1.0.0', + }, }); t.deepEqual(c.linker, { '@openfn/language-http': { @@ -433,11 +446,11 @@ test.serial('autoinstall: support custom whitelist', async (t) => { const jobs = [ { // will be ignored - adaptor: 'x@1.0.0', + adaptors: ['x@1.0.0'], }, { // will be installed - adaptor: 'y@1.0.0', + adaptors: ['y@1.0.0'], }, ]; @@ -462,7 +475,7 @@ test.serial('autoinstall: emit an event on completion', async (t) => { let event: any; const jobs = [ { - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], version: '1.0.0', }, ]; diff --git a/packages/engine-multi/test/engine.test.ts b/packages/engine-multi/test/engine.test.ts index d93c85f62..e1dbec616 100644 --- a/packages/engine-multi/test/engine.test.ts +++ b/packages/engine-multi/test/engine.test.ts @@ -28,6 +28,7 @@ const createPlan = (expression: string = '.', id = 'a') => ({ steps: [ { expression, + adaptors: [], }, ], }, diff --git a/packages/engine-multi/test/errors.test.ts b/packages/engine-multi/test/errors.test.ts index 8ba54cd20..0e4d7c7df 100644 --- a/packages/engine-multi/test/errors.test.ts +++ b/packages/engine-multi/test/errors.test.ts @@ -214,7 +214,7 @@ test.serial('emit a crash error on process.exit()', (t) => { workflow: { steps: [ { - adaptor: '@openfn/helper@1.0.0', + adaptors: ['@openfn/helper@1.0.0'], expression: 'export default [exit()]', }, ], diff --git a/packages/engine-multi/test/integration.test.ts b/packages/engine-multi/test/integration.test.ts index 28dc86259..bdaf48fb9 100644 --- a/packages/engine-multi/test/integration.test.ts +++ b/packages/engine-multi/test/integration.test.ts @@ -214,7 +214,7 @@ test.serial('trigger workflow-log for adaptor logs', (t) => { // This will trigger console.log from inside the adaptor // rather than from job code directly expression: "log('hola')", - adaptor: '@openfn/helper@1.0.0', + adaptors: ['@openfn/helper@1.0.0'], }, ]); diff --git a/packages/lexicon/core.d.ts b/packages/lexicon/core.d.ts index c3c205b11..dee4a491d 100644 --- a/packages/lexicon/core.d.ts +++ b/packages/lexicon/core.d.ts @@ -3,11 +3,14 @@ import { SanitizePolicies } from '@openfn/logger'; /** * An execution plan is a portable definition of a Work Order, * or, a unit of work to execute + * This definition represents the external format - the shape of + * the plan pre-compilation before it's passed into the runtime manager + * (ie, the CLI or Worker) */ export type ExecutionPlan = { id?: UUID; // this would be the run (nee attempt) id workflow: Workflow; - options: WorkflowOptions; + options?: WorkflowOptions; }; /** @@ -20,6 +23,10 @@ export type Workflow = { name?: string; steps: Array; + + // global credentials + // (gets applied to every configuration object) + credentials?: Record; }; /** @@ -27,14 +34,14 @@ export type Workflow = { * This is some openfn expression plus metadata (adaptor, credentials) */ export interface Job extends Step { - adaptor?: string; + adaptors?: string[]; expression: Expression; configuration?: object | string; state?: Omit | string; // Internal use only - // Allow module paths and versions to be overriden in the linker - // Maps to runtime.ModuleInfoMapo + // Allow module paths and versions to be overridden in the linker + // Maps to runtime.ModuleInfoMap linker?: Record< string, { diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index a30ba111c..857eacf13 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -1,5 +1,5 @@ import type { SanitizePolicies } from '@openfn/logger'; -import { State } from './core'; +import { LegacyJob, State } from './core'; export const API_VERSION: number; @@ -29,9 +29,9 @@ export type LightningPlan = { dataclip_id: string; starting_node_id: string; - triggers: Node[]; - jobs: Node[]; - edges: Edge[]; + triggers: LightningTrigger[]; + jobs: LightningJob[]; + edges: LightningEdge[]; options?: LightningPlanOptions; }; @@ -59,16 +59,23 @@ export type LightningPlanOptions = { * Sticking with the Node/Edge semantics to help distinguish the * Lightning and runtime typings */ -export type Node = { +export interface LightningNode { id: string; name?: string; body?: string; adaptor?: string; credential?: any; credential_id?: string; - type?: 'webhook' | 'cron'; // trigger only state?: State; -}; +} + +export interface LightningTrigger extends LightningNode { + type: 'webhook' | 'cron'; +} + +export interface LightningJob extends LightningNode { + adaptor: string; +} /** * This is a Path (or link) between two Jobs in a Plan. @@ -76,7 +83,7 @@ export type Node = { * Sticking with the Node/Edge semantics to help distinguish the * Lightning and runtime typings */ -export interface Edge { +export interface LightningEdge { id: string; source_job_id?: string; source_trigger_id?: string; diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index 9786be28c..5d79c74d3 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/lightning-mock +## 2.0.21 + +### Patch Changes + +- Updated dependencies [3463ff9] +- Updated dependencies [7245bf7] + - @openfn/runtime@1.5.0 + - @openfn/engine-multi@1.4.0 + ## 2.0.20 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 18faf906a..2a9c14853 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.20", + "version": "2.0.21", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/lightning-mock/test/server.test.ts b/packages/lightning-mock/test/server.test.ts index 5b0e9a3c1..6e9d48945 100644 --- a/packages/lightning-mock/test/server.test.ts +++ b/packages/lightning-mock/test/server.test.ts @@ -36,6 +36,7 @@ test.serial('should setup an run at /POST /run', async (t) => { user: 'john', password: 'rambo', }, + adaptor: 'abc', }, ], edges: [], diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 0e7f0f7d3..d9fd81dc5 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.5.0 + +### Minor Changes + +- 3463ff9: Support global credential object on a workflow + ## 1.4.2 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 8741d1fe5..f7172b47d 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.4.2", + "version": "1.5.0", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/compile-plan.ts b/packages/runtime/src/execute/compile-plan.ts index ca7c6b2cc..65a3b6e52 100644 --- a/packages/runtime/src/execute/compile-plan.ts +++ b/packages/runtime/src/execute/compile-plan.ts @@ -106,6 +106,9 @@ export default (plan: ExecutionPlan) => { start: options.start ?? workflow.steps[0]?.id!, }, }; + if (workflow.credentials) { + newPlan.workflow.credentials = workflow.credentials; + } const maybeAssign = (a: any, b: any, keys: Array) => { keys.forEach((key) => { @@ -131,10 +134,13 @@ export default (plan: ExecutionPlan) => { if (job.linker) { newStep.linker = job.linker; - } else if (job.adaptor) { + } else if (job.adaptors) { const job = step as Job; - const { name, version } = getNameAndVersion(job.adaptor!); - newStep.linker = { [name]: { version: version! } }; + newStep.linker ??= {}; + for (const adaptor of job.adaptors!) { + const { name, version } = getNameAndVersion(adaptor); + newStep.linker[name] = { version: version! }; + } } if (step.next) { diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index de9d90112..2ee45155e 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -80,7 +80,7 @@ const executeStep = async ( step: CompiledStep, input: State = {} ): Promise<{ next: StepId[]; state: any }> => { - const { opts, notify, logger, report } = ctx; + const { opts, notify, logger, report, plan } = ctx; const duration = Date.now(); @@ -104,12 +104,16 @@ const executeStep = async ( opts.callbacks?.resolveCredential! // cheat - we need to handle the error case here ); - const globals = await loadState( + const globalState = await loadState( job, opts.callbacks?.resolveState! // and here ); - - const state = assembleState(clone(input), configuration, globals); + const state = assembleState( + clone(input), + configuration, + globalState, + plan.workflow?.credentials + ); notify(NOTIFY_INIT_COMPLETE, { jobId, diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index 41fc80b72..c2756184e 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -36,6 +36,7 @@ export type Lazy = string | T; export type CompiledExecutionPlan = { workflow: { steps: Record; + credentials?: Record; }; options: WorkflowOptions & { start: StepId; diff --git a/packages/runtime/src/util/assemble-state.ts b/packages/runtime/src/util/assemble-state.ts index 84f5fc12e..e25b94eeb 100644 --- a/packages/runtime/src/util/assemble-state.ts +++ b/packages/runtime/src/util/assemble-state.ts @@ -13,7 +13,8 @@ const assembleData = (initialData: any, defaultData = {}) => { const assembleState = ( initialState: any = {}, // previous or initial state configuration = {}, - defaultState: any = {} // This is default state provided by the job + defaultState: any = {}, // This is default state provided by the job + globalCredentials: any = {} ) => { const obj = { ...defaultState, @@ -29,11 +30,7 @@ const assembleState = ( } Object.assign(obj, { - configuration: Object.assign( - {}, - initialState.configuration ?? {}, - configuration - ), + configuration: Object.assign({}, globalCredentials, configuration), data: assembleData(initialState.data, defaultState.data), }); diff --git a/packages/runtime/src/util/validate-plan.ts b/packages/runtime/src/util/validate-plan.ts index f43eeb2e1..fd1541b9e 100644 --- a/packages/runtime/src/util/validate-plan.ts +++ b/packages/runtime/src/util/validate-plan.ts @@ -70,10 +70,12 @@ export const buildModel = ({ workflow }: ExecutionPlan) => { }; const assertStart = (plan: ExecutionPlan) => { - const { start } = plan.options; - if (typeof start === 'string') { - if (!plan.workflow.steps.find(({ id }) => id === start)) { - throw new ValidationError(`Could not find start job: ${start}`); + if (plan.options) { + const { start } = plan.options; + if (typeof start === 'string') { + if (!plan.workflow.steps.find(({ id }) => id === start)) { + throw new ValidationError(`Could not find start job: ${start}`); + } } } }; diff --git a/packages/runtime/test/execute/compile-plan.test.ts b/packages/runtime/test/execute/compile-plan.test.ts index 809c2a664..922b593ea 100644 --- a/packages/runtime/test/execute/compile-plan.test.ts +++ b/packages/runtime/test/execute/compile-plan.test.ts @@ -33,6 +33,20 @@ const planWithEdge = (edge: Partial) => ({ }, }); +test('should preserve global credentials ', (t) => { + const compiledPlan = compilePlan({ + id: 'a', + workflow: { + steps: [{ id: 'a', expression: 'a' }], + credentials: { collections_token: 'j.w.t.' }, + }, + }); + + t.deepEqual(compiledPlan.workflow.credentials, { + collections_token: 'j.w.t.', + }); +}); + test('should preserve the start option', (t) => { const compiledPlan = compilePlan({ id: 'a', @@ -208,12 +222,12 @@ test('should write adaptor versions', (t) => { { id: 'x', expression: '.', - adaptor: 'x@1.0', + adaptors: ['x@1.0'], }, { id: 'y', expression: '.', - adaptor: 'y@1.0', + adaptors: ['y@1.0'], }, ], }, diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 5c9c65ce6..a55c5190d 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -12,8 +12,6 @@ import { } from '../src'; import run from '../src/runtime'; -type ExecutionPlanNoOptions = Omit; - test('run simple expression', async (t) => { const expression = 'export default [(s) => {s.data.done = true; return s}]'; @@ -22,7 +20,7 @@ test('run simple expression', async (t) => { }); test('run a simple workflow', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { expression: 'export default [(s) => ({ data: { done: true } })]' }, @@ -34,6 +32,22 @@ test('run a simple workflow', async (t) => { t.true(result.data.done); }); +test('run a workflow with global config', async (t) => { + const plan: ExecutionPlan = { + workflow: { + steps: [{ expression: 'export default [(s) => state.configuration];' }], + credentials: { + collection_token: 'j.w.t', + }, + }, + }; + + const result: any = await run(plan); + t.deepEqual(result, { + collection_token: 'j.w.t', + }); +}); + test('run a workflow and notify major events', async (t) => { const counts: Record = {}; const notify = (name: string) => { @@ -47,7 +61,7 @@ test('run a workflow and notify major events', async (t) => { notify, }; - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [{ expression: 'export default [(s) => s]' }], }, @@ -76,7 +90,7 @@ test('notify job error even after fail', async (t) => { notify, }; - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { id: 'a', expression: 'export default [(s) => s.data.x = s.err.z ]' }, @@ -102,7 +116,7 @@ test('notify job error even after crash', async (t) => { notify, }; - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [{ id: 'a', expression: 'export default [() => s]' }] }, }; @@ -139,7 +153,7 @@ test('resolve a credential', async (t) => { }); test('resolve initial state', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -174,7 +188,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => { notify, }; - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { id: 'a', expression: 'export default [(s) => s]', next: { b: true } }, @@ -192,7 +206,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => { }); test('run a workflow with state and parallel branching', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -239,7 +253,7 @@ test('run a workflow with state and parallel branching', async (t) => { }); test('run a workflow with a leaf step called multiple times', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -294,7 +308,7 @@ test('run a workflow with a leaf step called multiple times', async (t) => { // TODO this test sort of shows why input state on the plan object is a bit funky // running the same plan with two inputs is pretty clunky test('run a workflow with state and conditional branching', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -422,7 +436,7 @@ test('run a workflow with a start and end', async (t) => { }); test('run a workflow with a trigger node', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -442,7 +456,7 @@ test('run a workflow with a trigger node', async (t) => { }); test('prefer initial state to inline state', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -464,7 +478,7 @@ test('prefer initial state to inline state', async (t) => { }); test('Allow a job to return undefined', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [{ expression: 'export default [() => {}]' }], }, @@ -475,7 +489,7 @@ test('Allow a job to return undefined', async (t) => { }); test('log errors, write to state, and continue', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -507,7 +521,7 @@ test('log errors, write to state, and continue', async (t) => { }); test('log job code to the job logger', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -529,7 +543,7 @@ test('log job code to the job logger', async (t) => { }); test('log and serialize an error to the job logger', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -556,7 +570,7 @@ test('log and serialize an error to the job logger', async (t) => { }); test('error reports can be overwritten', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -580,7 +594,7 @@ test('error reports can be overwritten', async (t) => { // This tracks current behaviour but I don't know if it's right test('stuff written to state before an error is preserved', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -609,7 +623,7 @@ test('data can be an array (expression)', async (t) => { }); test('data can be an array (workflow)', async (t) => { - const plan: ExecutionPlanNoOptions = { + const plan: ExecutionPlan = { workflow: { steps: [ { @@ -777,21 +791,21 @@ test('run a workflow using the repo using a specific version', async (t) => { }); test('run a workflow using the repo with multiple versions of the same adaptor', async (t) => { - const plan = { + const plan: ExecutionPlan = { workflow: { steps: [ { id: 'a', expression: `import result from 'ultimate-answer'; export default [(s) => { s.data.a = result; return s;}];`, - adaptor: 'ultimate-answer@1.0.0', + adaptors: ['ultimate-answer@1.0.0'], next: { b: true }, }, { id: 'b', expression: `import result from 'ultimate-answer'; export default [(s) => { s.data.b = result; return s;}];`, - adaptor: 'ultimate-answer@2.0.0', + adaptors: ['ultimate-answer@2.0.0'], }, ], }, diff --git a/packages/runtime/test/util/assemble-state.test.ts b/packages/runtime/test/util/assemble-state.test.ts index eac478b93..b27c99c13 100644 --- a/packages/runtime/test/util/assemble-state.test.ts +++ b/packages/runtime/test/util/assemble-state.test.ts @@ -67,7 +67,7 @@ test('Initial data does not have to be an object', (t) => { }); }); -test('merges default and initial config objects', (t) => { +test('does not merge default and initial config objects', (t) => { const initial = { configuration: { x: 1 } }; const defaultState = undefined; const config = { y: 1 }; @@ -75,7 +75,6 @@ test('merges default and initial config objects', (t) => { const result = assembleState(initial, config, defaultState); t.deepEqual(result, { configuration: { - x: 1, y: 1, }, data: {}, @@ -95,3 +94,49 @@ test('configuration overrides initialState.configuration', (t) => { data: {}, }); }); + +test('global credentials should be added', (t) => { + const initial = {}; + const defaultState = undefined; + const config = undefined; + const global = { collection_token: 'j.w.t' }; + + const result = assembleState(initial, config, defaultState, global); + t.deepEqual(result, { + configuration: { + collection_token: 'j.w.t', + }, + data: {}, + }); +}); + +test('global credentials should be merged in', (t) => { + const initial = { configuration: { x: 1 } }; + const defaultState = undefined; + const config = { x: 2 }; + const global = { collection_token: 'j.w.t' }; + + const result = assembleState(initial, config, defaultState, global); + t.deepEqual(result, { + configuration: { + x: 2, + collection_token: 'j.w.t', + }, + data: {}, + }); +}); + +test('local credentials should override global credentials', (t) => { + const initial = { configuration: { x: 1 } }; + const defaultState = undefined; + const config = { collection_token: 'x.y.z' }; + const global = { collection_token: 'j.w.t' }; + + const result = assembleState(initial, config, defaultState, global); + t.deepEqual(result, { + configuration: { + collection_token: 'x.y.z', + }, + data: {}, + }); +}); diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index cf78dec30..bcbd69ae6 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,33 @@ # ws-worker +## 1.8.0 + +### Minor Changes + +- fd0e499: Support collections +- bcd82e9: Accept collection version at startup (as arg or auto-looked-up from npm) +- Support @local adaptor versions (which map to the monorepo) + +### Patch Changes + +- 1c79dc1: Append the collections adaptor to steps that need it +- b15f151: Update worker to use adaptors as an array on xplans. Internal only change. +- Updated dependencies [3463ff9] +- Updated dependencies [7245bf7] + - @openfn/runtime@1.5.0 + - @openfn/engine-multi@1.4.0 + +## 1.7.1 + +### Patch Changes + +- 1c79dc1: Append the collections adaptor to steps that need it +- b15f151: Update worker to use adaptors as an array on xplans. Internal only change. +- Updated dependencies [3463ff9] +- Updated dependencies [7245bf7] + - @openfn/runtime@2.0.0 + - @openfn/engine-multi@1.4.0 + ## 1.7.0 ### Minor Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index c10dac7ce..eef4ddfef 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.7.0", + "version": "1.8.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/channels/run.ts b/packages/ws-worker/src/channels/run.ts index 9cde5c97a..a8f925084 100644 --- a/packages/ws-worker/src/channels/run.ts +++ b/packages/ws-worker/src/channels/run.ts @@ -1,9 +1,7 @@ -import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon'; import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning'; import type { Logger } from '@openfn/logger'; import { getWithReply } from '../util'; -import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan'; import { GET_PLAN } from '../events'; import type { Channel, Socket } from '../types'; @@ -20,9 +18,7 @@ const joinRunChannel = ( ) => { return new Promise<{ channel: Channel; - plan: ExecutionPlan; - options: WorkerRunOptions; - input: Lazy; + run: LightningPlan; }>((resolve, reject) => { // TMP - lightning seems to be sending two responses to me // just for now, I'm gonna gate the handling here @@ -38,9 +34,8 @@ const joinRunChannel = ( if (!didReceiveOk) { didReceiveOk = true; logger.success(`connected to ${channelName}`, e); - const { plan, options, input } = await loadRun(channel); - logger.debug('converted run as execution plan:', plan); - resolve({ channel, plan, options, input }); + const run = await getWithReply(channel, GET_PLAN); + resolve({ channel, run }); } }) .receive('error', (err: any) => { @@ -65,10 +60,3 @@ const joinRunChannel = ( }; export default joinRunChannel; - -export async function loadRun(channel: Channel) { - // first we get the run body through the socket - const runBody = await getWithReply(channel, GET_PLAN); - // then we generate the execution plan - return convertRun(runBody as LightningPlan); -} diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index c77fb755c..d04a4e3e6 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -1,4 +1,8 @@ import { EventEmitter } from 'node:events'; + +import { promisify } from 'node:util'; +import { exec as _exec } from 'node:child_process'; + import Koa from 'koa'; import bodyParser from 'koa-bodyparser'; import koaLogger from 'koa-logger'; @@ -18,6 +22,9 @@ import connectToWorkerQueue from './channels/worker-queue'; import type { Server } from 'http'; import type { RuntimeEngine } from '@openfn/engine-multi'; import type { Socket, Channel } from './types'; +import { convertRun } from './util'; + +const exec = promisify(_exec); export type ServerOptions = { maxWorkflows?: number; @@ -36,6 +43,9 @@ export type ServerOptions = { socketTimeoutSeconds?: number; payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log) + collectionsVersion?: string; + collectionsUrl?: string; + monorepoDir?: string; }; // this is the server/koa API @@ -50,6 +60,9 @@ export interface ServerApp extends Koa { engine: RuntimeEngine; options: ServerOptions; workloop?: Workloop; + // What version of the collections adaptor should we use? + // Can be set through CLI, or else it'll look up latest on startup + collectionsVersion?: string; execute: ({ id, token }: ClaimRun) => Promise; destroy: () => void; @@ -137,6 +150,33 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { .on('error', onError); } +async function setupCollections(options: ServerOptions, logger: Logger) { + if (options.collectionsUrl) { + logger.log('Using collections endpoint at ', options.collectionsUrl); + } else { + logger.warn( + 'WARNING: no collections URL provided. Collections service will not be enabled.' + ); + logger.warn( + 'Pass --collections-version or set WORKER_COLLECTIONS_URL to set the url' + ); + return; + } + + if (options.collectionsVersion && options.collectionsVersion !== 'latest') { + logger.log( + 'Using collections version from CLI/env: ', + options.collectionsVersion + ); + return options.collectionsVersion; + } + const { stdout: version } = await exec( + 'npm view @openfn/language-collections@latest version' + ); + logger.log('Using collections version from @latest: ', version); + return version; +} + function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { const logger = options.logger || createMockLogger(); const port = options.port || DEFAULT_PORT; @@ -195,12 +235,27 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { const start = Date.now(); app.workflows[id] = true; - const { - channel: runChannel, - plan, - options = {}, - input, - } = await joinRunChannel(app.socket, token, id, logger); + const { channel: runChannel, run } = await joinRunChannel( + app.socket, + token, + id, + logger + ); + + const { plan, options, input } = convertRun(run, { + collectionsVersion: app.options.collectionsVersion, + monorepoPath: app.options.monorepoDir, + }); + logger.debug('converted run body into execution plan:', plan); + + // Setup collections + if (plan.workflow.credentials?.collections_token) { + plan.workflow.credentials.collections_token = token; + } + if (plan.workflow.credentials?.collections_endpoint) { + plan.workflow.credentials.collections_endpoint = + app.options.collectionsUrl; + } // Default the payload limit if it's not otherwise set on the run options if (!('payloadLimitMb' in options)) { @@ -266,7 +321,10 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.use(router.routes()); if (options.lightning) { - connect(app, logger, options); + setupCollections(options, logger).then((version) => { + app.collectionsVersion = version; + connect(app, logger, options); + }); } else { logger.warn('No lightning URL provided'); } diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 56e22ad0a..a6ea784f2 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -28,7 +28,6 @@ const [minBackoff, maxBackoff] = args.backoff function engineReady(engine: any) { logger.debug('Creating worker instance'); - const workerOptions: ServerOptions = { port: args.port, lightning: args.lightning, @@ -42,6 +41,9 @@ function engineReady(engine: any) { }, maxWorkflows: args.capacity, payloadLimitMb: args.payloadMemory, + collectionsVersion: args.collectionsVersion, + collectionsUrl: args.collectionsUrl, + monorepoDir: args.monorepoDir, }; if (args.lightningPublicKey) { diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index 1c7a2bf83..46516d335 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -8,21 +8,24 @@ const DEFAULT_SOCKET_TIMEOUT_SECONDS = 10; type Args = { _: string[]; - port?: number; + backoff: string; + capacity?: number; + collectionsUrl?: string; + collectionsVersion?: string; lightning?: string; - repoDir?: string; - secret?: string; - loop?: boolean; - log?: LogLevel; lightningPublicKey?: string; + log?: LogLevel; + loop?: boolean; + maxRunDurationSeconds: number; mock?: boolean; - backoff: string; - capacity?: number; - runMemory?: number; + monorepoDir?: string; payloadMemory?: number; - statePropsToRemove?: string[]; - maxRunDurationSeconds: number; + port?: number; + repoDir?: string; + runMemory?: number; + secret?: string; socketTimeoutSeconds?: number; + statePropsToRemove?: string[]; }; type ArgTypes = string | string[] | number | undefined; @@ -51,6 +54,8 @@ export default function parseArgs(argv: string[]): Args { const { WORKER_BACKOFF, WORKER_CAPACITY, + WORKER_COLLECTIONS_VERSION, + WORKER_COLLECTIONS_URL, WORKER_LIGHTNING_PUBLIC_KEY, WORKER_LIGHTNING_SERVICE_URL, WORKER_LOG_LEVEL, @@ -62,6 +67,7 @@ export default function parseArgs(argv: string[]): Args { WORKER_SECRET, WORKER_STATE_PROPS_TO_REMOVE, WORKER_SOCKET_TIMEOUT_SECONDS, + OPENFN_ADAPTORS_REPO, } = process.env; const parser = yargs(hideBin(argv)) @@ -81,6 +87,11 @@ export default function parseArgs(argv: string[]): Args { description: 'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR', }) + .option('monorepo-dir', { + alias: 'm', + description: + 'Path to the adaptors mono repo, from where @local adaptors will be loaded. Env: OPENFN_ADAPTORS_REPO', + }) .option('secret', { alias: 's', description: @@ -135,6 +146,16 @@ export default function parseArgs(argv: string[]): Args { description: 'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS', type: 'number', + }) + .option('collections-url', { + alias: ['c'], + description: + 'URL to the Collections service endpoint. Required for Collections, eg, https://app.openfn.org/collections. Env: WORKER_COLLECTIONS_URL', + }) + .option('collections-version', { + description: + 'The version of the collections adaptor to use for all runs on this worker instance.Env: WORKER_COLLECTIONS_VERSION', + type: 'string', }); const args = parser.parse() as Args; @@ -148,6 +169,7 @@ export default function parseArgs(argv: string[]): Args { 'ws://localhost:4000/worker' ), repoDir: setArg(args.repoDir, WORKER_REPO_DIR), + monorepoDir: setArg(args.monorepoDir, OPENFN_ADAPTORS_REPO), secret: setArg(args.secret, WORKER_SECRET), lightningPublicKey: setArg( args.lightningPublicKey, @@ -173,5 +195,10 @@ export default function parseArgs(argv: string[]): Args { WORKER_SOCKET_TIMEOUT_SECONDS, DEFAULT_SOCKET_TIMEOUT_SECONDS ), + collectionsVersion: setArg( + args.collectionsVersion, + WORKER_COLLECTIONS_VERSION + ), + collectionsUrl: setArg(args.collectionsUrl, WORKER_COLLECTIONS_URL), } as Args; } diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index df8d4b96c..45a5e52e7 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -1,4 +1,5 @@ import crypto from 'node:crypto'; +import path from 'node:path'; import type { Step, StepId, @@ -10,8 +11,9 @@ import type { WorkflowOptions, Lazy, } from '@openfn/lexicon'; -import { LightningPlan, Edge } from '@openfn/lexicon/lightning'; +import { LightningPlan, LightningEdge } from '@openfn/lexicon/lightning'; import { ExecuteOptions } from '@openfn/engine-multi'; +import { getNameAndVersion } from '@openfn/runtime'; export const conditions: Record string | null> = { @@ -22,7 +24,7 @@ export const conditions: Record string | null> = always: (_upstreamId: string) => null, }; -const mapEdgeCondition = (edge: Edge) => { +const mapEdgeCondition = (edge: LightningEdge) => { const { condition } = edge; if (condition && condition in conditions) { const upstream = (edge.source_job_id || edge.source_trigger_id) as string; @@ -31,7 +33,7 @@ const mapEdgeCondition = (edge: Edge) => { return condition; }; -const mapTriggerEdgeCondition = (edge: Edge) => { +const mapTriggerEdgeCondition = (edge: LightningEdge) => { const { condition } = edge; // This handles cron triggers with undefined conditions and the 'always' string. if (condition === undefined || condition === 'always') return true; @@ -46,9 +48,53 @@ export type WorkerRunOptions = ExecuteOptions & { payloadLimitMb?: number; }; +type ConversionOptions = { + collectionsVersion?: string; + monorepoPath?: string; +}; + export default ( - run: LightningPlan + run: LightningPlan, + options: ConversionOptions = {} ): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy } => { + const { collectionsVersion, monorepoPath } = options; + + const appendLocalVersions = (job: Job) => { + if (monorepoPath && job.adaptors!) { + for (const adaptor of job.adaptors) { + const { name, version } = getNameAndVersion(adaptor); + if (monorepoPath && version === 'local') { + const shortName = name.replace('@openfn/language-', ''); + const localPath = path.resolve(monorepoPath, 'packages', shortName); + job.linker ??= {}; + job.linker[name] = { + path: localPath, + version: 'local', + }; + } + } + } + return job; + }; + + // This function will look at every step and decide whether the collections adaptor + // should be added to the array + const appendCollectionsAdaptor = ( + plan: ExecutionPlan, + collectionsVersion: string = 'latest' + ) => { + let hasCollections; + plan.workflow.steps.forEach((step) => { + const job = step as Job; + if (job.expression?.match(/(collections\.)/)) { + hasCollections = true; + job.adaptors ??= []; + job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`); + } + }); + return hasCollections; + }; + // Some options get mapped straight through to the runtime's workflow options const runtimeOpts: Omit = {}; @@ -88,7 +134,7 @@ export default ( const nodes: Record = {}; - const edges: Edge[] = run.edges ?? []; + const edges: LightningEdge[] = run.edges ?? []; // We don't really care about triggers, it's mostly just a empty node if (run.triggers?.length) { @@ -125,7 +171,7 @@ export default ( id, configuration: step.credential || step.credential_id, expression: step.body!, - adaptor: step.adaptor, + adaptors: step.adaptor ? [step.adaptor] : [], }; if (step.name) { @@ -170,6 +216,24 @@ export default ( plan.workflow.name = run.name; } + if (collectionsVersion) { + const hasCollections = appendCollectionsAdaptor( + plan as ExecutionPlan, + collectionsVersion + ); + if (hasCollections) { + plan.workflow.credentials = { + collections_token: true, + collections_endpoint: true, + }; + } + } + + // Find any @local versions and set them up properly + for (const step of plan.workflow.steps) { + appendLocalVersions(step as Job); + } + return { plan: plan as ExecutionPlan, options: engineOpts, diff --git a/packages/ws-worker/src/util/create-run-state.ts b/packages/ws-worker/src/util/create-run-state.ts index bb30fa05a..deaa76aad 100644 --- a/packages/ws-worker/src/util/create-run-state.ts +++ b/packages/ws-worker/src/util/create-run-state.ts @@ -22,8 +22,8 @@ export default (plan: ExecutionPlan, input?: Lazy): RunState => { // find the first job const jobs = plan.workflow.steps as Job[]; let startNode = jobs[0]; - if (plan.options.start) { - startNode = jobs.find(({ id }) => id === plan.options.start)!; + if (plan.options?.start) { + startNode = jobs.find(({ id }) => id === plan.options?.start)!; } const initialRuns: string[] = []; diff --git a/packages/ws-worker/test/api/destroy.test.ts b/packages/ws-worker/test/api/destroy.test.ts index 2b47105c1..dce7c2992 100644 --- a/packages/ws-worker/test/api/destroy.test.ts +++ b/packages/ws-worker/test/api/destroy.test.ts @@ -24,6 +24,7 @@ test.beforeEach(async () => { lightning: `ws://localhost:${lightningPort}/worker`, port: workerPort, backoff: { min: 10, max: 20 }, + collectionsVersion: '1.0.0', }); }); diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 16ccdf089..926804630 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -459,7 +459,7 @@ test('execute should call all events on the socket', async (t) => { { id: 'trigger', configuration: 'a', - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], expression: 'fn(() => console.log("x"))', }, ], diff --git a/packages/ws-worker/test/channels/run.test.ts b/packages/ws-worker/test/channels/run.test.ts index 37dfa4d72..71a76fe2a 100644 --- a/packages/ws-worker/test/channels/run.test.ts +++ b/packages/ws-worker/test/channels/run.test.ts @@ -1,79 +1,24 @@ import test from 'ava'; import { mockSocket, mockChannel } from '../../src/mock/sockets'; -import joinRunChannel, { loadRun } from '../../src/channels/run'; +import joinRunChannel from '../../src/channels/run'; import { GET_PLAN } from '../../src/events'; import { runs } from '../mock/data'; import { createMockLogger } from '@openfn/logger'; -test('loadRun should get the run body', async (t) => { - const run = runs['run-1']; - let didCallGetRun = false; - const channel = mockChannel({ - [GET_PLAN]: () => { - // TODO should be no payload (or empty payload) - didCallGetRun = true; - return run; - }, - }); - - await loadRun(channel); - t.true(didCallGetRun); -}); - -test('loadRun should return an execution plan and options', async (t) => { - const run = { - ...runs['run-1'], - options: { - sanitize: 'obfuscate', - run_timeout_ms: 10, - }, - }; - - const channel = mockChannel({ - [GET_PLAN]: () => run, - }); - - const { plan, options } = await loadRun(channel); - t.like(plan, { - id: 'run-1', - workflow: { - steps: [ - { - id: 'job-1', - configuration: 'a', - expression: 'fn(a => a)', - adaptor: '@openfn/language-common@1.0.0', - }, - ], - }, - }); - t.is(options.sanitize, 'obfuscate'); - t.is(options.runTimeoutMs, 10); -}); - -test('should join an run channel with a token', async (t) => { +test('should join a run channel with a token and return a raw lightning run', async (t) => { const logger = createMockLogger(); const socket = mockSocket('www', { 'run:a': mockChannel({ // Note that the validation logic is all handled here join: () => ({ status: 'ok' }), - [GET_PLAN]: () => ({ - id: 'a', - options: { run_timeout_ms: 10 }, - }), + [GET_PLAN]: () => runs['run-1'], }), }); - const { channel, plan, options } = await joinRunChannel( - socket, - 'x.y.z', - 'a', - logger - ); + const { channel, run } = await joinRunChannel(socket, 'x.y.z', 'a', logger); t.truthy(channel); - t.deepEqual(plan, { id: 'a', workflow: { steps: [] }, options: {} }); - t.deepEqual(options, { runTimeoutMs: 10 }); + t.deepEqual(run, runs['run-1']); }); test('should fail to join an run channel with an invalid token', async (t) => { diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 42ed02230..4614442fd 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -42,14 +42,15 @@ test.before(async () => { lightning: urls.lng, secret: 'abc', maxWorkflows: 1, - + collectionsVersion: '1.0.0', + collectionsUrl: 'www', // Note that if this is not passed, // JWT verification will be skipped runPublicKey: keys.public, backoff: { min: 1, max: 1000, - } + }, }); }); @@ -110,109 +111,98 @@ test.serial( } ); -test.serial( - `should not claim while at capacity, then resume`, - (t) => { - return new Promise((done) => { - - let runIsActive = false; - let runComplete = false; - let didClaimAfterComplete = false; +test.serial(`should not claim while at capacity, then resume`, (t) => { + return new Promise((done) => { + let runIsActive = false; + let runComplete = false; + let didClaimAfterComplete = false; - const run = { - id: `a${++rollingRunId}`, - jobs: [ - { - id: 'j', - adaptor: '@openfn/language-common@1.0.0', - body: `fn(() => new Promise((resolve) => { + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { setTimeout(resolve, 500) }))`, - }, - ], - }; - - - lng.on(e.CLAIM, () => { - if (runIsActive) { - t.fail('Claimed while run is active') - } - if (runComplete) { - didClaimAfterComplete = true; - } - }); + }, + ], + }; - lng.onSocketEvent(e.RUN_START, run.id, () => { - runIsActive = true; - }) + lng.on(e.CLAIM, () => { + if (runIsActive) { + t.fail('Claimed while run is active'); + } + if (runComplete) { + didClaimAfterComplete = true; + } + }); - lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { - runIsActive = false; - runComplete = true; + lng.onSocketEvent(e.RUN_START, run.id, () => { + runIsActive = true; + }); - setTimeout(() => { - t.true(didClaimAfterComplete); - done() - }, 10) - }); + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + runIsActive = false; + runComplete = true; - lng.enqueueRun(run); + setTimeout(() => { + t.true(didClaimAfterComplete); + done(); + }, 10); }); - } -); -test.serial( - `should reset backoff after claim`, - (t) => { - return new Promise((done) => { + lng.enqueueRun(run); + }); +}); - let lastClaim = Date.now() - let lastClaimDiff = 0; +test.serial(`should reset backoff after claim`, (t) => { + return new Promise((done) => { + let lastClaim = Date.now(); + let lastClaimDiff = 0; - const run = { - id: `a${++rollingRunId}`, - jobs: [ - { - id: 'j', - adaptor: '@openfn/language-common@1.0.0', - body: `fn(() => new Promise((resolve) => { + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { setTimeout(resolve, 500) }))`, - }, - ], - }; - + }, + ], + }; - lng.on(e.CLAIM, () => { - lastClaimDiff = Date.now() - lastClaim; - lastClaim = Date.now() - }); + lng.on(e.CLAIM, () => { + lastClaimDiff = Date.now() - lastClaim; + lastClaim = Date.now(); + }); - lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { - // set this articially high - if there are no more claims, the test will fail - lastClaimDiff = 10000; - - // When the run is finished, the claims should resume - // but with a smaller backoff - setTimeout(() => { - t.log('Backoff after run:', lastClaimDiff) - t.true(lastClaimDiff < 5) - done() - }, 10) - }); + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + // set this articially high - if there are no more claims, the test will fail + lastClaimDiff = 10000; - + // When the run is finished, the claims should resume + // but with a smaller backoff setTimeout(() => { - t.log('Backoff before run:', lastClaimDiff) - // let the backoff increase a bit - // the last claim diff should be at least 30ms - t.true(lastClaimDiff > 30) - - lng.enqueueRun(run); - }, 600) + t.log('Backoff after run:', lastClaimDiff); + t.true(lastClaimDiff < 5); + done(); + }, 10); }); - } -); + + setTimeout(() => { + t.log('Backoff before run:', lastClaimDiff); + // let the backoff increase a bit + // the last claim diff should be at least 20ms + t.true(lastClaimDiff > 20); + + lng.enqueueRun(run); + }, 600); + }); +}); test.todo('worker should log when a run token is verified'); @@ -269,6 +259,29 @@ test.serial('should run a run which returns initial state', async (t) => { }); }); +test.serial('should run a run with the collections adaptor', async (t) => { + return new Promise((done) => { + const run = { + id: 'run-1', + jobs: [ + { + // This should be enough to fake the worker into + // loading the collections machinery + body: 'fn((s) => /* collections.get */ s.configuration)', + }, + ], + }; + + lng.waitForResult(run.id).then((result: any) => { + t.is(result.collections_endpoint, 'www'); + t.is(typeof result.collections_token, 'string'); + done(); + }); + + lng.enqueueRun(run); + }); +}); + // A basic high level integration test to ensure the whole loop works // This checks the events received by the lightning websocket test.serial( diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index aa66c2aa6..28842769e 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -17,7 +17,7 @@ const sampleWorkflow = { steps: [ { id: 'j1', - adaptor: 'common@1.0.0', + adaptors: ['common@1.0.0'], expression: 'fn(() => ({ data: { x: 10 } }))', }, ], @@ -85,7 +85,7 @@ test.serial('Dispatch complete events for a job', async (t) => { test.serial('Dispatch error event for a crash', async (t) => { const wf = createPlan({ id: 'j1', - adaptor: 'common@1.0.0', + adaptors: ['common@1.0.0'], expression: 'fn(() => ( @~!"@£!4 )', }); @@ -146,7 +146,7 @@ test.serial('listen to events', async (t) => { const wf = createPlan({ id: 'j1', - adaptor: 'common@1.0.0', + adaptors: ['common@1.0.0'], expression: 'export default [() => { console.log("x"); }]', }); @@ -236,7 +236,7 @@ test.serial( // @ts-ignore const workflow = createPlan({ id: 'j1', - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], }); let didCallEvent = false; diff --git a/packages/ws-worker/test/reasons.test.ts b/packages/ws-worker/test/reasons.test.ts index c66c2ef40..018776aea 100644 --- a/packages/ws-worker/test/reasons.test.ts +++ b/packages/ws-worker/test/reasons.test.ts @@ -209,7 +209,7 @@ test('exception: autoinstall error', async (t) => { const plan = createPlan({ id: 'a', expression: '.', - adaptor: '@openfn/language-common@1.0.0', + adaptors: ['@openfn/language-common@1.0.0'], }); // TODO I also need to ensure that this calls run:complete diff --git a/packages/ws-worker/test/server.test.ts b/packages/ws-worker/test/server.test.ts index 1ac45a6cc..577dd04e2 100644 --- a/packages/ws-worker/test/server.test.ts +++ b/packages/ws-worker/test/server.test.ts @@ -8,6 +8,7 @@ test.before(async () => { port: 2323, secret: 'abc', maxWorkflows: 1, + collectionsVersion: '1.0.0', }); }); diff --git a/packages/ws-worker/test/util.ts b/packages/ws-worker/test/util.ts index df70a3c99..d29aa3e5b 100644 --- a/packages/ws-worker/test/util.ts +++ b/packages/ws-worker/test/util.ts @@ -1,5 +1,5 @@ import { ExecutionPlan, Job } from '@openfn/lexicon'; -import { Edge, Node } from '@openfn/lexicon/lightning'; +import { LightningEdge, LightningNode } from '@openfn/lexicon/lightning'; import crypto from 'node:crypto'; export const wait = (fn: () => any, maxRuns = 100) => @@ -34,7 +34,7 @@ export const sleep = (delay = 100) => setTimeout(resolve, delay); }); -export const createPlan = (...steps: Job[]) => +export const createPlan = (...steps: Partial[]) => ({ id: crypto.randomUUID(), workflow: { @@ -48,13 +48,13 @@ export const createEdge = (from: string, to: string) => id: `${from}-${to}`, source_job_id: from, target_job_id: to, - } as Edge); + } as LightningEdge); export const createJob = (body?: string, id?: string) => ({ id: id || crypto.randomUUID(), body: body || `fn((s) => s)`, - } as Node); + } as LightningNode); export const createRun = (jobs = [], edges = [], triggers = []) => ({ id: crypto.randomUUID(), diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index 0d3ef653a..ed8f9da31 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -1,5 +1,9 @@ import test from 'ava'; -import type { LightningPlan, Node } from '@openfn/lexicon/lightning'; +import type { + LightningPlan, + LightningJob, + LightningTrigger, +} from '@openfn/lexicon/lightning'; import convertPlan, { conditions } from '../../src/util/convert-lightning-plan'; import { ConditionalStepEdge, Job } from '@openfn/lexicon'; @@ -11,7 +15,7 @@ const createNode = (props = {}) => adaptor: 'common', credential_id: 'y', ...props, - } as Node); + } as LightningJob); const createEdge = (from: string, to: string, props = {}) => ({ id: `${from}-${to}`, @@ -26,13 +30,13 @@ const createTrigger = (props = {}) => id: 't', type: 'cron', ...props, - } as Node); + } as LightningTrigger); // Creates a runtime job node const createJob = (props = {}) => ({ id: 'a', expression: 'x', - adaptor: 'common', + adaptors: ['common'], configuration: 'y', ...props, }); @@ -584,3 +588,112 @@ test('convert edge condition always', (t) => { const edge = job.next as Record; t.false(edge.b.hasOwnProperty('condition')); }); + +test('append the collections adaptor to jobs that use it', (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ id: 'a' }), + createNode({ + id: 'b', + body: 'collections.each("c", "k", (state) => state)', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('a', 'b')], + }; + const { plan } = convertPlan(run as LightningPlan, { + collectionsVersion: '1.0.0', + }); + + const [_t, a, b] = plan.workflow.steps; + + // @ts-ignore + t.deepEqual(a.adaptors, ['common']); + // @ts-ignore + t.deepEqual(b.adaptors, ['common', '@openfn/language-collections@1.0.0']); +}); + +test('append the collections credential to jobs that use it', (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ id: 'a' }), + createNode({ + id: 'b', + body: 'collections.each("c", "k", (state) => state)', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('a', 'b')], + }; + const { plan } = convertPlan(run as LightningPlan, { + collectionsVersion: '1.0.0', + }); + + const creds = plan.workflow.credentials; + + t.deepEqual(creds, { + collections_token: true, + collections_endpoint: true, + }); +}); + +test("Don't set up collections if no version is passed", (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ + id: 'a', + body: 'collections.each("c", "k", (state) => state)', + adaptor: 'common', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + const { plan } = convertPlan(run as LightningPlan); + + const [_t, a] = plan.workflow.steps; + + t.deepEqual((a as Job).adaptors, ['common']); + t.falsy(plan.workflow.credentials); +}); + +test('Use local paths', (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ + id: 'a', + body: 'collections.each("c", "k", (state) => state)', + adaptor: 'common@local', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + collectionsVersion: 'local', + monorepoPath: '/adaptors', + }); + + const [_t, a] = plan.workflow.steps as any[]; + + t.deepEqual(a.adaptors, [ + 'common@local', + '@openfn/language-collections@local', + ]); + t.deepEqual(a.linker, { + // The adaptor is not exapanded into long form, could be a problem + common: { + path: '/adaptors/packages/common', + version: 'local', + }, + '@openfn/language-collections': { + path: '/adaptors/packages/collections', + version: 'local', + }, + }); +}); diff --git a/packages/ws-worker/test/util/create-run-state.test.ts b/packages/ws-worker/test/util/create-run-state.test.ts index 31ad59577..8df4f5cf8 100644 --- a/packages/ws-worker/test/util/create-run-state.test.ts +++ b/packages/ws-worker/test/util/create-run-state.test.ts @@ -9,7 +9,7 @@ const createPlan = (jobs: Partial[]) => steps: jobs.map((j) => ({ expression: '.', ...j })), }, options: {}, - } as ExecutionPlan); + } as Required); test('create run', (t) => { const plan = createPlan([{ id: 'a' }]); diff --git a/packages/ws-worker/test/worker.test.ts b/packages/ws-worker/test/worker.test.ts index 19f4fc5c6..5a18f79d7 100644 --- a/packages/ws-worker/test/worker.test.ts +++ b/packages/ws-worker/test/worker.test.ts @@ -44,7 +44,7 @@ const execute = async (plan: ExecutionPlan, input = {}, options = {}) => [STEP_START]: async () => true, [RUN_LOG]: async (_evt) => { //console.log(evt.source, evt.message) - return true + return true; }, [STEP_COMPLETE]: async () => true, [RUN_COMPLETE]: async () => true, @@ -60,7 +60,6 @@ const execute = async (plan: ExecutionPlan, input = {}, options = {}) => doExecute(channel, engine, logger, plan, input, options, onFinish); }); - // Repro for https://github.com/OpenFn/kit/issues/616 // This will not run in CI unless the env is set if (process.env.OPENFN_TEST_SF_TOKEN && process.env.OPENFN_TEST_SF_PASSWORD) { @@ -80,21 +79,21 @@ if (process.env.OPENFN_TEST_SF_TOKEN && process.env.OPENFN_TEST_SF_PASSWORD) { "email": "test@test.com" }]) )`, - adaptor: '@openfn/language-salesforce@4.5.0', + adaptors: ['@openfn/language-salesforce@4.5.0'], configuration: { username: 'demo@openfn.org', securityToken: process.env.OPENFN_TEST_SF_TOKEN, password: process.env.OPENFN_TEST_SF_PASSWORD, loginUrl: 'https://login.salesforce.com', - } + }, }); const input = { data: { result: 42 } }; - const result= await execute(plan, input); - t.log(result) + const result = await execute(plan, input); + t.log(result); // Actually this fails right but it's a permissions thing on the sandbox t.is(result.reason.reason, 'success'); - }) -} \ No newline at end of file + }); +}