Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: warn when an expression doesn't return state #832

Merged
merged 8 commits into from
Dec 3, 2024
Merged
2 changes: 1 addition & 1 deletion packages/cli/test/commands.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ test.serial('run a job with defaults: openfn job.js', async (t) => {
test.serial('run a job which does not return state', async (t) => {
const result = await run('openfn job.js', 'export default [s => {}]');

t.falsy(result);
t.deepEqual(result, {});
});

test.serial('run a workflow', async (t) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/execute/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ test.serial('run a job which does not return state', async (t) => {
expressionPath: '/job.js',
};
const result = await handler(options, logger);
t.falsy(result);
t.deepEqual(result, {});

// Check that no error messages have been logged
t.is(logger._history.length, 0);
Expand Down
29 changes: 15 additions & 14 deletions packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,20 @@ test.serial('trigger workflow-log for job logs', (t) => {

let didLog = false;

api.execute(plan, emptyState).on('workflow-log', (evt) => {
if (evt.name === 'JOB') {
didLog = true;
t.deepEqual(evt.message, JSON.stringify(['hola']));
t.pass('workflow logged');
}
});

api.execute(plan, emptyState).on('workflow-complete', (evt) => {
t.true(didLog);
t.falsy(evt.state.errors);
done();
});
api
.execute(plan, emptyState)
.on('workflow-log', (evt) => {
if (evt.name === 'JOB') {
didLog = true;
t.deepEqual(evt.message, JSON.stringify(['hola']));
t.pass('workflow logged');
}
})
.on('workflow-complete', (evt) => {
t.true(didLog);
t.falsy(evt.state.errors);
done();
});
});
});

Expand Down Expand Up @@ -260,7 +261,7 @@ test.serial('run without error if no state is returned', (t) => {
]);

api.execute(plan, emptyState).on('workflow-complete', ({ state }) => {
t.falsy(state);
t.deepEqual(state, {});

// Ensure there are no error logs
const err = logger._find('error', /./);
Expand Down
24 changes: 21 additions & 3 deletions packages/runtime/src/execute/expression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../errors';
import type { JobModule, ExecutionContext } from '../types';
import { ModuleInfoMap } from '../modules/linker';
import { checkAndClearNullState, nullState } from '../util/null-state';

export type ExecutionErrorWrapper = {
state: any;
Expand Down Expand Up @@ -51,7 +52,7 @@ export default (
// Create the main reducer function
const reducer = (execute || defaultExecute)(
...operations.map((op, idx) =>
wrapOperation(op, logger, `${idx + 1}`, opts.immutableState)
wrapOperation(op, logger, `${idx + 1}`, opts.immutableState, `${idx}`)
)
);

Expand Down Expand Up @@ -100,13 +101,30 @@ export const wrapOperation = (
fn: Operation,
logger: Logger,
name: string,
immutableState?: boolean
immutableState?: boolean,
prevName?: string
) => {
return async (state: State) => {
logger.debug(`Starting operation ${name}`);
const start = new Date().getTime();
if (checkAndClearNullState(state)) {
logger.warn(`Operation ${name} might fail!`);
logger.warn(
`The previous operation ${prevName} didn't return a state. did you forget?`
);
}
const newState = immutableState ? clone(state) : state;
const result = await fn(newState);

if (typeof fn !== 'function') {
logger.warn(`Are you sure ${name} is an operation?`);
logger.debug(`Operation ${name} isn't a valid operation`);
const duration = printDuration(new Date().getTime() - start);
logger.debug(`Operation ${name} skipped in ${duration}`);
return newState;
}

const result = (await fn(newState)) || nullState();

// TODO should we warn if an operation does not return state?
// the trick is saying WHICH operation without source mapping
const duration = printDuration(new Date().getTime() - start);
Expand Down
7 changes: 4 additions & 3 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ const prepareFinalState = (
removedProps.push(prop);
}
});
logger.debug(
`Cleaning up state. Removing keys: ${removedProps.join(', ')}`
);
if (removedProps.length)
logger.debug(
`Cleaning up state. Removing keys: ${removedProps.join(', ')}`
);

const cleanState = stringify(state);
return JSON.parse(cleanState);
Expand Down
21 changes: 21 additions & 0 deletions packages/runtime/src/util/null-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const NULL_STATE = Symbol('NullState');

// The good thing about using a Symbol is that even if we forget to clean the object.
// it's still represented as {}, because symbols aren't visible as keys
export function nullState() {
return { [NULL_STATE]: true };
}

export function isNullState(state: any) {
return typeof state === 'object' && state[NULL_STATE] === true;
}

export function clearNullState(state: any) {
if (typeof state === 'object') delete state[NULL_STATE];
}

export function checkAndClearNullState(state: any) {
const isNull = isNullState(state);
if (isNull) clearNullState(state);
return isNull;
}
20 changes: 12 additions & 8 deletions packages/runtime/test/execute/expression.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Operation, State } from '@openfn/lexicon';

import execute, { mergeLinkerOptions } from '../../src/execute/expression';
import type { ExecutionContext } from '../../src/types';
import { isNullState } from '../../src/util/null-state';

type TestState = State & {
data: {
Expand Down Expand Up @@ -139,17 +140,20 @@ test.serial('async operations run in series', async (t) => {
t.is(result.data.x, 12);
});

test.serial('jobs can return undefined', async (t) => {
// @ts-ignore violating the operation contract here
const job = [() => undefined] as Operation[];
test.serial(
'jobs return null-state instead of undefined or null',
async (t) => {
// @ts-ignore violating the operation contract here
const job = [() => undefined] as Operation[];

const state = createState() as TestState;
const context = createContext();
const state = createState() as TestState;
const context = createContext();

const result = (await execute(context, job, state, {})) as TestState;
const result = (await execute(context, job, state, {})) as TestState;

t.assert(result === undefined);
});
t.assert(isNullState(result));
}
);

test.serial('jobs can mutate the original state', async (t) => {
const job = [
Expand Down
6 changes: 5 additions & 1 deletion packages/runtime/test/execute/plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,11 @@ test('ignore leaf nodes with no result', async (t) => {
const state = { data: { x: 0 } };

const result: any = await executePlan(plan, state, {}, mockLogger);
t.deepEqual(result, { data: { x: 1 } });
t.deepEqual(result, {
a: { data: { x: 1 } },
b: {},
c: {},
});
});

test('isolate state in "parallel" execution', async (t) => {
Expand Down
12 changes: 9 additions & 3 deletions packages/runtime/test/execute/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,10 @@ test.serial('log memory usage', async (t) => {
t.regex(memory?.message, /\d+mb(.+)\d+mb/i);
});

test.serial('warn if a non-leaf step does not return state', async (t) => {
test.serial("warn if a previous step don't return state", async (t) => {
const step = {
id: 'k',
expression: [(s: State) => {}],
expression: [(s: State) => {}, (s: State) => {}],
next: { l: true },
};

Expand All @@ -297,8 +297,14 @@ test.serial('warn if a non-leaf step does not return state', async (t) => {

// @ts-ignore ts complains that the step does not return state
const result = await execute(context, step, state);
const warn = logger._find('warn', /did not return a state object/);
const warn = logger._find('warn', /Operation \d+ might fail!/);
t.truthy(warn);

const warnPrev = logger._find(
'warn',
/The previous operation \d+ didn't return a state. did you forget\?/
);
t.truthy(warnPrev);
});

test.serial('do not warn if a leaf step does not return state', async (t) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,15 +477,15 @@ test('prefer initial state to inline state', async (t) => {
t.is(result.data.y, 20);
});

test('Allow a job to return undefined', async (t) => {
test('Allow a job to return a null-state instead of undefined', async (t) => {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [() => {}]' }],
},
};

const result: any = await run(plan);
t.falsy(result);
t.deepEqual(result, {});
});

test('log errors, write to state, and continue', async (t) => {
Expand Down
15 changes: 8 additions & 7 deletions packages/runtime/test/security.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// a suite of tests with various security concerns in mind
import test from 'ava';
import { createMockLogger } from '@openfn/logger';
import type { ExecutionPlan, State } from '@openfn/lexicon';
import type { ExecutionPlan } from '@openfn/lexicon';

import run from '../src/runtime';

Expand Down Expand Up @@ -64,12 +64,12 @@ test.serial(
);

test.serial('jobs should not have access to global scope', async (t) => {
const src = 'export default [() => globalThis.x]';
const src = 'export default [() => ({x: globalThis.x, y: "some-val"})]';
// @ts-ignore
globalThis.x = 42;

const result: any = await run(src);
t.falsy(result);
t.deepEqual(result, { y: 'some-val' });

// @ts-ignore
delete globalThis.x;
Expand All @@ -90,16 +90,17 @@ test.serial('jobs should be able to mutate global state', async (t) => {
});

test.serial('jobs should each run in their own context', async (t) => {
const src1 = 'export default [() => { globalThis.x = 1; return 1;}]';
const src2 = 'export default [() => globalThis.x]';
const src1 =
'export default [() => { globalThis.x = 1; return { x: globalThis.x }}]';
const src2 = 'export default [() => { return { x: globalThis.x }}]';

await run(src1);

const r1 = (await run(src1)) as any;
t.is(r1, 1);
t.deepEqual(r1, { x: 1 });

const r2 = (await run(src2)) as any;
t.is(r2, undefined);
t.deepEqual(r2, {});
});

test.serial('jobs should not have a process object', async (t) => {
Expand Down