From 4376f7b3e615c0b0144deecf6d65d4a03dbb3abf Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 21 Nov 2024 17:44:02 +0000 Subject: [PATCH] Runtime: improve order of state tidyups * Runtime: move cleaning of state from expression to step 1. Moves the cleaning of state after a job expression has been executed to the step level 2. Moves tests for state cleaning to step 3. Updates step state cloning to use safe-stringify * Runtime: update job success/error log wording * Runtime: update changelog * versions --------- Co-authored-by: Farhan Yahaya --- integration-tests/execute/CHANGELOG.md | 7 + integration-tests/execute/package.json | 2 +- integration-tests/worker/CHANGELOG.md | 8 + integration-tests/worker/package.json | 2 +- packages/cli/CHANGELOG.md | 7 + packages/cli/package.json | 2 +- packages/engine-multi/CHANGELOG.md | 7 + packages/engine-multi/package.json | 2 +- packages/lightning-mock/CHANGELOG.md | 8 + packages/lightning-mock/package.json | 2 +- packages/runtime/CHANGELOG.md | 6 + packages/runtime/package.json | 2 +- packages/runtime/src/execute/expression.ts | 43 +---- packages/runtime/src/execute/step.ts | 43 ++++- packages/runtime/src/util/clone.ts | 3 +- .../runtime/test/execute/expression.test.ts | 116 ------------- packages/runtime/test/execute/plan.test.ts | 13 +- packages/runtime/test/execute/step.test.ts | 153 +++++++++++++++++- packages/runtime/test/runtime.test.ts | 2 +- packages/ws-worker/CHANGELOG.md | 8 + packages/ws-worker/package.json | 2 +- 21 files changed, 260 insertions(+), 178 deletions(-) diff --git a/integration-tests/execute/CHANGELOG.md b/integration-tests/execute/CHANGELOG.md index d893fbef9..4568707b6 100644 --- a/integration-tests/execute/CHANGELOG.md +++ b/integration-tests/execute/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/integration-tests-execute +## 1.0.8 + +### Patch Changes + +- Updated dependencies [f6bd593] + - @openfn/runtime@1.5.2 + ## 1.0.7 ### Patch Changes diff --git a/integration-tests/execute/package.json b/integration-tests/execute/package.json index 8c3e2058c..cec8424a1 100644 --- a/integration-tests/execute/package.json +++ b/integration-tests/execute/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-execute", "private": true, - "version": "1.0.7", + "version": "1.0.8", "description": "Job execution tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index c1fc62ddd..324f17ad0 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/integration-tests-worker +## 1.0.66 + +### Patch Changes + +- @openfn/engine-multi@1.4.2 +- @openfn/lightning-mock@2.0.23 +- @openfn/ws-worker@1.8.3 + ## 1.0.65 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 61c1d2193..4e669f13f 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.65", + "version": "1.0.66", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 435f622d6..b591f9000 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/cli +## 1.8.9 + +### Patch Changes + +- Updated dependencies [f6bd593] + - @openfn/runtime@1.5.2 + ## 1.8.8 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index e8bb0051a..23d495429 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.8.8", + "version": "1.8.9", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index a830ebeb4..faa2e912a 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,12 @@ # engine-multi +## 1.4.2 + +### Patch Changes + +- Updated dependencies [f6bd593] + - @openfn/runtime@1.5.2 + ## 1.4.1 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index dfad91e0c..f038538b1 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.4.1", + "version": "1.4.2", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index e3ad44a55..a4ea4b29e 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.0.23 + +### Patch Changes + +- Updated dependencies [f6bd593] + - @openfn/runtime@1.5.2 + - @openfn/engine-multi@1.4.2 + ## 2.0.22 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 1ca96088a..bae196075 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.0.22", + "version": "2.0.23", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 5724c4a2f..ae92876c1 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.5.2 + +### Patch Changes + +- f6bd593: Move cleaning of state from expression to step, resulting in clearer logs. + ## 1.5.1 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 51edcc891..ed91c93f1 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.5.1", + "version": "1.5.2", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/expression.ts b/packages/runtime/src/execute/expression.ts index 0cdaf3e44..64bc694f7 100644 --- a/packages/runtime/src/execute/expression.ts +++ b/packages/runtime/src/execute/expression.ts @@ -1,5 +1,4 @@ import { printDuration, Logger } from '@openfn/logger'; -import stringify from 'fast-safe-stringify'; import type { Operation, State } from '@openfn/lexicon'; import loadModule from '../modules/module-loader'; @@ -76,20 +75,9 @@ export default ( duration = Date.now() - duration; - const finalState = prepareFinalState( - result, - logger, - opts.statePropsToRemove - ); - // return the final state - resolve(finalState); + resolve(result); } catch (e: any) { // whatever initial state looks like now, clean it and report it back - const finalState = prepareFinalState( - input, - logger, - opts.statePropsToRemove - ); duration = Date.now() - duration; let finalError; try { @@ -103,7 +91,7 @@ export default ( finalError = e; } - reject({ state: finalState, error: finalError } as ExecutionErrorWrapper); + reject({ state: input, error: finalError } as ExecutionErrorWrapper); } }); @@ -167,30 +155,3 @@ const prepareJob = async ( return { operations: expression as Operation[] }; } }; - -// TODO this is suboptimal and may be slow on large objects -// (especially as the result get stringified again downstream) -const prepareFinalState = ( - state: any, - logger: Logger, - statePropsToRemove?: string[] -) => { - if (state) { - if (!statePropsToRemove) { - // As a strict default, remove the configuration key - // tbh this should happen higher up in the stack but it causes havoc in unit testing - statePropsToRemove = ['configuration']; - } - - statePropsToRemove.forEach((prop) => { - if (state.hasOwnProperty(prop)) { - delete state[prop]; - logger.debug(`Removed ${prop} from final state`); - } - }); - - const cleanState = stringify(state); - return JSON.parse(cleanState); - } - return state; -}; diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 2ee45155e..b9c8e732b 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -15,6 +15,7 @@ import { NOTIFY_JOB_ERROR, NOTIFY_JOB_START, } from '../events'; +import stringify from 'fast-safe-stringify'; const loadCredentials = async ( job: Job, @@ -72,6 +73,36 @@ const calculateNext = (job: CompiledStep, result: any, logger: Logger) => { return next; }; +// TODO this is suboptimal and may be slow on large objects +// (especially as the result get stringified again downstream) +const prepareFinalState = ( + state: any, + logger: Logger, + statePropsToRemove?: string[] +) => { + if (state) { + if (!statePropsToRemove) { + // As a strict default, remove the configuration key + // tbh this should happen higher up in the stack but it causes havoc in unit testing + statePropsToRemove = ['configuration']; + } + + const removedProps: string[] = []; + statePropsToRemove.forEach((prop) => { + if (state.hasOwnProperty(prop)) { + delete state[prop]; + removedProps.push(prop); + } + }); + logger.debug( + `Cleaning up state. Removing keys: ${removedProps.join(', ')}` + ); + + const cleanState = stringify(state); + return JSON.parse(cleanState); + } + return state; +}; // The job handler is responsible for preparing the job // and working out where to go next // it'll resolve credentials and state and notify how long init took @@ -138,13 +169,16 @@ const executeStep = async ( } catch (e: any) { didError = true; if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) { - const { error, state } = e as ExecutionErrorWrapper; + const { error, state: errState } = e as ExecutionErrorWrapper; + let state = errState; + const duration = logger.timer(timerId); + logger.error(`${jobName} aborted with error (${duration})`); + + state = prepareFinalState(state, logger, ctx.opts.statePropsToRemove); // Whatever the final state was, save that as the intial state to the next thing result = state; - const duration = logger.timer(timerId); - logger.error(`Failed step ${jobName} after ${duration}`); report(state, jobId, error); next = calculateNext(step, result, logger); @@ -169,7 +203,8 @@ const executeStep = async ( if (!didError) { const humanDuration = logger.timer(timerId); - logger.success(`Completed step ${jobName} in ${humanDuration}`); + logger.success(`${jobName} completed in ${humanDuration}ms`); + result = prepareFinalState(result, logger, ctx.opts.statePropsToRemove); // Take a memory snapshot // IMPORTANT: this runs _after_ the state object has been serialized diff --git a/packages/runtime/src/util/clone.ts b/packages/runtime/src/util/clone.ts index 408f108a6..0920f7106 100644 --- a/packages/runtime/src/util/clone.ts +++ b/packages/runtime/src/util/clone.ts @@ -1,5 +1,6 @@ import type { State } from '@openfn/lexicon'; +import stringify from 'fast-safe-stringify'; // TODO I'm in the market for the best solution here - immer? deep-clone? // What should we do if functions are in the state? -export default (state: State) => JSON.parse(JSON.stringify(state)); +export default (state: State) => JSON.parse(stringify(state)); diff --git a/packages/runtime/test/execute/expression.test.ts b/packages/runtime/test/execute/expression.test.ts index 01dc93063..1ecd02d5d 100644 --- a/packages/runtime/test/execute/expression.test.ts +++ b/packages/runtime/test/execute/expression.test.ts @@ -83,122 +83,6 @@ test.serial('jobs can handle a promise', async (t) => { t.deepEqual(state, result); }); -test.serial('output state should be serializable', async (t) => { - const job = [async (s: State) => s]; - - const circular = {}; - circular.self = circular; - - const state = createState({ - circular, - fn: () => {}, - }); - - const context = createContext(); - - const result = await execute(context, job, state); - - t.notThrows(() => JSON.stringify(result)); - - t.is(result.data.circular.self, '[Circular]'); - t.falsy(result.data.fn); -}); - -test.serial( - 'configuration is removed from the result by default', - async (t) => { - const job = [async (s: State) => s]; - const context = createContext(); - - const result = await execute(context, job, { configuration: {} }); - t.deepEqual(result, {}); - } -); - -test.serial( - 'statePropsToRemove removes multiple props from state', - async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x', 'y']; - const context = createContext({}, { statePropsToRemove }); - - const result = await execute(context, job, { x: 1, y: 1, z: 1 }); - t.deepEqual(result, { z: 1 }); - } -); - -test.serial( - 'statePropsToRemove logs to debug when a prop is removed', - async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x']; - - const context = createContext({}, { statePropsToRemove }); - - const result = await execute(context, job, { x: 1, y: 1, z: 1 }); - t.deepEqual(result, { y: 1, z: 1 }); - - const log = logger._find('debug', /removed x from final state/i); - t.truthy(log); - } -); - -test.serial( - 'no props are removed from state if an empty array is passed to statePropsToRemove', - async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = ['x', 'y']; - const context = createContext({}, { statePropsToRemove }); - - const state = { x: 1, configuration: 1 }; - const result = await execute(context, job, state as any); - t.deepEqual(result, state); - } -); - -test.serial( - 'no props are removed from state if a falsy value is passed to statePropsToRemove', - async (t) => { - const job = [async (s: State) => s]; - const statePropsToRemove = undefined; - const context = createContext({}, { statePropsToRemove }); - - const state = { x: 1, configuration: 1 }; - const result = await execute(context, job, state as any); - t.deepEqual(result, state); - } -); - -test.serial('config is removed from the result', async (t) => { - const job = [async (s: State) => s]; - const context = createContext({ opts: {} }); - - const result = await execute(context, job, { configuration: {} }); - t.deepEqual(result, {}); -}); - -test.serial( - 'output state is returned verbatim, apart from config', - async (t) => { - const state = { - data: {}, - references: [], - configuration: {}, - x: true, - }; - const job = [async () => ({ ...state })]; - - const context = createContext(); - - const result = await execute(context, job, {}); - t.deepEqual(result, { - data: {}, - references: [], - x: true, - }); - } -); - test.serial('operations run in series', async (t) => { const job = [ (s: TestState) => { diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index b305f3648..083f6bdc1 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -919,9 +919,10 @@ test('log appropriately on error', async (t) => { const logger = createMockLogger(undefined, { level: 'debug' }); await executePlan(plan, {}, {}, logger); - const err = logger._find('error', /failed step/i); + const err = logger._find('error', /aborted with error/i); t.truthy(err); - t.regex(err!.message as string, /Failed step job1 after \d+ms/i); + console.log('msg:', err?.message); + t.regex(err!.message as string, /job1 aborted with error \(\d+ms\)/i); t.truthy(logger._find('error', /Check state.errors.job1 for details/i)); @@ -1183,8 +1184,8 @@ test('Plans log step ids for each job start and end', async (t) => { const start = logger._find('info', /starting step a/i); t.is(start!.message, 'Starting step a'); - const end = logger._find('success', /completed step a/i); - t.regex(end!.message as string, /Completed step a in \d+ms/); + const end = logger._find('success', /completed in/i); + t.regex(end!.message as string, /a completed in \d+ms/); }); test('Plans log step names for each job start and end', async (t) => { @@ -1201,6 +1202,6 @@ test('Plans log step names for each job start and end', async (t) => { const start = logger._find('info', /starting step do-the-thing/i); t.is(start!.message, 'Starting step do-the-thing'); - const end = logger._find('success', /completed step do-the-thing/i); - t.regex(end!.message as string, /Completed step do-the-thing in \d+ms/); + const end = logger._find('success', /do-the-thing completed in/i); + t.regex(end!.message as string, /do-the-thing completed in \d+ms/); }); diff --git a/packages/runtime/test/execute/step.test.ts b/packages/runtime/test/execute/step.test.ts index 39e2f5866..216f9b6b5 100644 --- a/packages/runtime/test/execute/step.test.ts +++ b/packages/runtime/test/execute/step.test.ts @@ -263,9 +263,9 @@ test.serial('log duration of execution', async (t) => { await execute(context, step, initialState); - const duration = logger._find('success', /completed step /i); + const duration = logger._find('success', /completed in/i); - t.regex(duration?.message, /completed step y in \d\d?ms/i); + t.regex(duration?.message, /y completed in \d\d?ms/i); }); test.serial('log memory usage', async (t) => { @@ -340,3 +340,152 @@ test.serial( t.falsy(warn); } ); + +test.serial('output state should be serializable', async (t) => { + const job = [async (s: State) => s]; + + const step = { + id: 'k', + expression: job, + }; + + const circular = {}; + circular.self = circular; + + const state = createState({ + circular, + fn: () => {}, + }); + + const context = createContext(); + + const result = await execute(context, step, state); + + t.notThrows(() => JSON.stringify(result)); + + t.is(result.state.data.circular.self, '[Circular]'); + t.falsy(result.state.data.fn); +}); + +test.serial( + 'configuration is removed from the result by default', + async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const context = createContext(); + + const result = await execute(context, step, { configuration: {} }); + t.deepEqual(result.state, { data: {} }); + } +); + +test.serial( + 'statePropsToRemove removes multiple props from state', + async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const statePropsToRemove = ['x', 'y']; + const context = createContext({ opts: { statePropsToRemove } }); + + const result = await execute(context, step, { x: 1, y: 1, z: 1 }); + t.deepEqual(result.state, { data: {}, z: 1, configuration: {} }); + } +); + +test.serial( + 'statePropsToRemove logs to debug when a prop is removed', + async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const statePropsToRemove = ['x']; + + const context = createContext({ opts: { statePropsToRemove } }); + + const result = await execute(context, step, { x: 1, y: 1, z: 1 }); + t.deepEqual(result.state, { data: {}, y: 1, z: 1, configuration: {} }); + + const log = logger._find('debug', /Cleaning up state. Removing keys: x/i); + t.truthy(log); + } +); + +test.serial( + 'no props are removed from state if an empty array is passed to statePropsToRemove', + async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const statePropsToRemove: string[] = []; + const context = createContext({ opts: { statePropsToRemove } }); + + const state = { x: 1, configuration: 1 }; + const result = await execute(context, step, state as any); + t.deepEqual(result.state, { x: 1, configuration: {}, data: {} }); + } +); + +test.serial( + 'no props are removed from state if a falsy value is passed to statePropsToRemove', + async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const statePropsToRemove = undefined; + const context = createContext({ opts: { statePropsToRemove } }); + + const state = { x: 1, configuration: 1 }; + const result = await execute(context, step, state as any); + t.deepEqual(result.state, { x: 1, data: {} }); + } +); + +test.serial('config is removed from the result', async (t) => { + const job = [async (s: State) => s]; + const step = { + id: 'k', + expression: job, + }; + const context = createContext({ opts: {} }); + + const result = await execute(context, step, { configuration: {} }); + t.deepEqual(result.state, { data: {} }); +}); + +test.serial( + 'output state is returned verbatim, apart from config', + async (t) => { + const state = { + data: {}, + references: [], + configuration: {}, + x: true, + }; + const job = [async () => ({ ...state })]; + const step = { + id: 'k', + expression: job, + }; + + const context = createContext(); + + const result = await execute(context, step, {}); + t.deepEqual(result.state, { + data: {}, + references: [], + x: true, + }); + } +); diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index a55c5190d..719aa9db4 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -517,7 +517,7 @@ test('log errors, write to state, and continue', async (t) => { message: 'test', }); - t.truthy(logger._find('error', /failed step a/i)); + t.truthy(logger._find('error', /aborted with error/i)); }); test('log job code to the job logger', async (t) => { diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 4cf3c4345..ff947a30a 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,13 @@ # ws-worker +## 1.8.3 + +### Patch Changes + +- Updated dependencies [f6bd593] + - @openfn/runtime@1.5.2 + - @openfn/engine-multi@1.4.2 + ## 1.8.2 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 3f997016c..d6bfdbf4b 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.8.2", + "version": "1.8.3", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",