diff --git a/.changeset/swift-dingos-know.md b/.changeset/swift-dingos-know.md new file mode 100644 index 000000000..936a56b3a --- /dev/null +++ b/.changeset/swift-dingos-know.md @@ -0,0 +1,6 @@ +--- +'@openfn/cli': minor +'@openfn/deploy': minor +--- + +Deploy: allow job body to be loaded from a file path in workflow.yaml diff --git a/packages/cli/src/pull/handler.ts b/packages/cli/src/pull/handler.ts index f8087fea7..fab775124 100644 --- a/packages/cli/src/pull/handler.ts +++ b/packages/cli/src/pull/handler.ts @@ -6,6 +6,7 @@ import { getProject, getSpec, getStateFromProjectPayload, + syncRemoteSpec, } from '@openfn/deploy'; import type { Logger } from '../util/logger'; import { PullOptions } from '../pull/command'; @@ -47,12 +48,6 @@ async function pullHandler(options: PullOptions, logger: Logger) { // Build the state.json const state = getStateFromProjectPayload(project!); - // Write the final project state to disk - await fs.writeFile( - path.resolve(config.statePath), - JSON.stringify(state, null, 2) - ); - logger.always('Downloading the project spec (as YAML) from the server.'); // Get the project.yaml from Lightning const queryParams = new URLSearchParams(); @@ -85,9 +80,20 @@ async function pullHandler(options: PullOptions, logger: Logger) { const resolvedPath = path.resolve(config.specPath); logger.debug('reading spec from', resolvedPath); - // Write the yaml to disk - // @ts-ignore - await fs.writeFile(resolvedPath, res.body); + const updatedSpec = await syncRemoteSpec( + await res.text(), + state, + config, + logger + ); + + // Write the final project state and yaml to disk + await fs.writeFile( + path.resolve(config.statePath), + JSON.stringify(state, null, 2) + ); + + await fs.writeFile(resolvedPath, updatedSpec); // Read the spec back in a parsed yaml const spec = await getSpec(resolvedPath); diff --git a/packages/deploy/src/index.ts b/packages/deploy/src/index.ts index 9a4877688..1c22fbd97 100644 --- a/packages/deploy/src/index.ts +++ b/packages/deploy/src/index.ts @@ -10,6 +10,7 @@ import { toProjectPayload, getStateFromProjectPayload, } from './stateTransform'; +import { syncRemoteSpec } from './pull'; import { deployProject, getProject } from './client'; import { DeployError } from './deployError'; import { Logger } from '@openfn/logger'; @@ -33,6 +34,7 @@ export { mergeSpecIntoState, mergeProjectPayloadIntoState, getStateFromProjectPayload, + syncRemoteSpec, }; export async function getConfig(path?: string): Promise { @@ -91,7 +93,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise { export async function getSpec(path: string) { try { const body = await readFile(path, 'utf8'); - return parseAndValidate(body); + return parseAndValidate(body, path); } catch (error: any) { if (error.code === 'ENOENT') { throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR'); diff --git a/packages/deploy/src/pull.ts b/packages/deploy/src/pull.ts new file mode 100644 index 000000000..4df7b7f11 --- /dev/null +++ b/packages/deploy/src/pull.ts @@ -0,0 +1,122 @@ +import YAML, { Pair, Scalar, isPair, isScalar } from 'yaml'; +import { DeployConfig, ProjectState, SpecJob } from './types'; +import { Logger } from '@openfn/logger'; +import { writeFile } from 'fs/promises'; +import path from 'path'; +import { getState, getSpec } from './index'; + +async function getAllSpecJobs( + config: DeployConfig, + logger: Logger +): Promise { + const jobs: SpecJob[] = []; + + try { + const state = await getState(config.statePath); + const spec = await getSpec(config.specPath); + + for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) { + if (workflow.jobs) { + for (const [jobKey, specJob] of Object.entries(workflow.jobs)) { + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + stateJob && + jobs.push({ + id: stateJob.id, + name: specJob.name, + adaptor: specJob.adaptor, + body: specJob.body, + }); + } + } + } + } catch (error: any) { + logger.debug(`Could not read the spec and state: ${error.message}`); + } + + return jobs; +} + +async function extractJobsToDisk( + specBody: string, + state: ProjectState, + oldJobs: SpecJob[], + config: DeployConfig +): Promise { + function isPairWithScalarKey( + node: any + ): node is Pair & { key: Scalar & { value: string } } { + return ( + isPair(node) && isScalar(node.key) && typeof node.key.value === 'string' + ); + } + + const doc = YAML.parseDocument(specBody); + + await YAML.visitAsync(doc, { + async Pair(_, pair: any, pairPath) { + if ( + !pair.key || + pair.key.value !== 'body' || + !isScalar(pair.value) || + pairPath.length <= 6 + ) { + return; + } + + const jobPair = pairPath[pairPath.length - 2]; + const workflowPair = pairPath[pairPath.length - 6]; + + if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) { + return; + } + + const jobKey = jobPair.key.value; + const workflowKey = workflowPair.key.value; + + // find the job in the state + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + + if (!stateJob) { + return; + } + + // check if the state job is in the old spec jobs + const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id); + + if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') { + return; + } + + const oldSpecJobPath = oldSpecJob.body.path; + + if (oldSpecJobPath) { + const basePath = path.dirname(config.specPath); + const resolvedPath = path.resolve(basePath, oldSpecJobPath); + await writeFile(resolvedPath, pair.value.value); + + // set the body path in the spec + const map = doc.createNode({ path: oldSpecJobPath }); + + pair.value = map; + } + }, + }); + + return doc.toString(); +} + +export async function syncRemoteSpec( + remoteSpecBody: string, + newState: ProjectState, + config: DeployConfig, + logger: Logger +): Promise { + try { + const oldSpecJobs = await getAllSpecJobs(config, logger); + + return extractJobsToDisk(remoteSpecBody, newState, oldSpecJobs, config); + } catch (error: any) { + logger.warn(`Could not update spec job body paths: ${error.message}`); + return remoteSpecBody; + } +} diff --git a/packages/deploy/src/stateTransform.ts b/packages/deploy/src/stateTransform.ts index 62ca4c21a..d1b11b936 100644 --- a/packages/deploy/src/stateTransform.ts +++ b/packages/deploy/src/stateTransform.ts @@ -5,6 +5,7 @@ import { ProjectSpec, ProjectState, SpecEdge, + SpecJobBody, StateEdge, WorkflowSpec, WorkflowState, @@ -20,6 +21,14 @@ import { import { DeployError } from './deployError'; import { Logger } from '@openfn/logger/dist'; +function stringifyJobBody(body: SpecJobBody): string { + if (typeof body === 'object') { + return body.content; + } else { + return body; + } +} + function mergeJobs( stateJobs: WorkflowState['jobs'], specJobs: WorkflowSpec['jobs'] @@ -33,7 +42,7 @@ function mergeJobs( id: crypto.randomUUID(), name: specJob.name, adaptor: specJob.adaptor, - body: specJob.body, + body: stringifyJobBody(specJob.body), }, ]; } @@ -49,7 +58,7 @@ function mergeJobs( id: stateJob.id, name: specJob.name, adaptor: specJob.adaptor, - body: specJob.body, + body: stringifyJobBody(specJob.body), }, ]; } diff --git a/packages/deploy/src/types.ts b/packages/deploy/src/types.ts index 754307b8e..a4ea7f52d 100644 --- a/packages/deploy/src/types.ts +++ b/packages/deploy/src/types.ts @@ -1,4 +1,4 @@ -export type Job = { +export type StateJob = { id?: string; name: string; adaptor: string; @@ -6,6 +6,20 @@ export type Job = { delete?: boolean; }; +export type SpecJobBody = + | string + | { + path?: string; + content: string; + }; + +export type SpecJob = { + id?: string; + name: string; + adaptor: string; + body: SpecJobBody; +}; + export type Trigger = { id?: string; type?: string; @@ -38,7 +52,7 @@ export type SpecEdge = { export type WorkflowSpec = { id?: string; name: string; - jobs?: Record; + jobs?: Record; triggers?: Record; edges?: Record; }; @@ -52,7 +66,7 @@ export interface ProjectSpec { export interface WorkflowState { id: string; name: string; - jobs: Record>; + jobs: Record>; triggers: Record>; edges: Record>; delete?: boolean; @@ -78,7 +92,7 @@ export interface ProjectPayload { id: string; name: string; project_id?: string; - jobs: Concrete[]; + jobs: Concrete[]; triggers: Concrete[]; edges: Concrete[]; }[]; diff --git a/packages/deploy/src/validator.ts b/packages/deploy/src/validator.ts index 6c93fa90a..79e218ab2 100644 --- a/packages/deploy/src/validator.ts +++ b/packages/deploy/src/validator.ts @@ -1,5 +1,7 @@ -import YAML, { YAMLMap, isMap, isPair } from 'yaml'; +import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml'; import { ProjectSpec } from './types'; +import { readFile } from 'fs/promises'; +import path from 'path'; export interface Error { context: any; @@ -8,12 +10,16 @@ export interface Error { range?: [number, number, number]; } -export function parseAndValidate(input: string): { +export async function parseAndValidate( + input: string, + specPath: string = '.' +): Promise<{ errors: Error[]; doc: ProjectSpec; -} { +}> { let errors: Error[] = []; const doc = YAML.parseDocument(input); + const basePath = path.dirname(specPath); function ensureUniqueId(key: string, arr: string[]) { if (arr.includes(key)) { @@ -97,8 +103,8 @@ export function parseAndValidate(input: string): { } } - YAML.visit(doc, { - Pair(_, pair: any) { + await YAML.visitAsync(doc, { + async Pair(_, pair: any, pairPath) { if (pair.key && pair.key.value === 'workflows') { if (pair.value.value === null) { errors.push({ @@ -124,6 +130,37 @@ export function parseAndValidate(input: string): { } } + if ( + pair.key && + pair.key.value === 'body' && + pairPath.length > 4 && + isMap(pair.value) + ) { + const pathValue = pair.value.get('path'); + const grandPair = pairPath[pairPath.length - 4]; + + if ( + isPair(grandPair) && + isScalar(grandPair.key) && + grandPair.key.value === 'jobs' && + typeof pathValue === 'string' + ) { + const filePath = path.resolve(basePath, pathValue); + try { + const content = await readFile(filePath, 'utf8'); + pair.value.set('content', content); + } catch (error: any) { + errors.push({ + path: `job/body/path`, + context: pair, + message: `Failed to read file ${pathValue}: ${error.message}`, + range: pair.value.range, + }); + } + return undefined; + } + } + if (pair.key && pair.key.value === 'condition_expression') { if (typeof pair.value.value !== 'string') { pair.value.value = String(pair.value.value); diff --git a/packages/deploy/test/fixtures.ts b/packages/deploy/test/fixtures.ts index a639f7b2a..c97ed0182 100644 --- a/packages/deploy/test/fixtures.ts +++ b/packages/deploy/test/fixtures.ts @@ -11,7 +11,10 @@ export function fullExampleSpec() { 'job-a': { name: 'job a', adaptor: '@openfn/language-common@latest', - body: '', + body: { + path: 'somefile.js', + content: '', + }, }, 'job-b': { name: 'job b', diff --git a/packages/deploy/test/util.ts b/packages/deploy/test/util.ts new file mode 100644 index 000000000..1ce2ad371 --- /dev/null +++ b/packages/deploy/test/util.ts @@ -0,0 +1,14 @@ +import mock from 'mock-fs'; +import path from 'node:path'; + +export const mockFs = (files: Record) => { + const pnpm = path.resolve('../../node_modules/.pnpm'); + mock({ + [pnpm]: mock.load(pnpm, {}), + ...files, + }); +}; + +export const resetMockFs = () => { + mock.restore(); +}; diff --git a/packages/deploy/test/validator.test.ts b/packages/deploy/test/validator.test.ts index 18cdf6d23..2e7ea582f 100644 --- a/packages/deploy/test/validator.test.ts +++ b/packages/deploy/test/validator.test.ts @@ -1,11 +1,14 @@ import test from 'ava'; import { parseAndValidate } from '../src/validator'; +import { mockFs, resetMockFs } from './util'; function findError(errors: any[], message: string) { return errors.find((e) => e.message === message); } -test('Workflows must be a map', (t) => { +test.after(resetMockFs); + +test('Workflows must be a map', async (t) => { const doc = ` name: project-name workflows: @@ -13,7 +16,7 @@ test('Workflows must be a map', (t) => { - name: workflow-two `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'must be a map'); @@ -21,7 +24,7 @@ test('Workflows must be a map', (t) => { t.is(err.path, 'workflows'); }); -test('Workflows must have unique ids', (t) => { +test('Workflows must have unique ids', async (t) => { const doc = ` name: project-name workflows: @@ -33,14 +36,14 @@ test('Workflows must have unique ids', (t) => { name: workflow three `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'duplicate id: workflow-one'); t.truthy(err); t.is(err.path, 'workflow-one'); }); -test('Jobs must have unique ids within a workflow', (t) => { +test('Jobs must have unique ids within a workflow', async (t) => { const doc = ` name: project-name workflows: @@ -52,14 +55,14 @@ test('Jobs must have unique ids within a workflow', (t) => { bar: `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'duplicate id: foo'); t.is(err.path, 'workflow-two/foo'); t.truthy(err); }); -test('Job ids can duplicate across workflows', (t) => { +test('Job ids can duplicate across workflows', async (t) => { const doc = ` name: project-name workflows: @@ -73,12 +76,12 @@ test('Job ids can duplicate across workflows', (t) => { foo: `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); t.is(results.errors.length, 0); }); -test('Workflow edges are parsed correctly', (t) => { +test('Workflow edges are parsed correctly', async (t) => { const doc = ` name: project-name workflows: @@ -101,7 +104,7 @@ test('Workflow edges are parsed correctly', (t) => { condition_expression: true `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); t.assert( results.doc.workflows['workflow-one'].edges![ @@ -110,12 +113,12 @@ test('Workflow edges are parsed correctly', (t) => { ); }); -test('allow empty workflows', (t) => { +test('allow empty workflows', async (t) => { let doc = ` name: project-name `; - const result = parseAndValidate(doc); + const result = await parseAndValidate(doc, 'spec.yaml'); t.is(result.errors.length, 0); @@ -124,3 +127,33 @@ test('allow empty workflows', (t) => { workflows: {}, }); }); + +test('adds the file content into the job body from the specified path', async (t) => { + // Step 1: Create a temporary file that the YAML will reference + const fileContent = 'fn(state => state.data);'; + mockFs({ + '/jobBody.js': fileContent, + }); + + // Step 2: YAML document that references the file + const doc = ` + name: project-name + workflows: + workflow-one: + name: workflow one + jobs: + job-one: + name: job one + adaptor: '@openfn/language-http@latest' + body: + path: /jobBody.js + `; + + // Step 3: Run the parseAndValidate function + const results = await parseAndValidate(doc, 'spec.yaml'); + + // Step 4: Assert that the content from the file was merged into the spec + const jobBody = results.doc.workflows['workflow-one'].jobs!['job-one'].body; + + t.is(jobBody.content, fileContent); +});