Skip to content

Commit

Permalink
Merge pull request #803 from OpenFn/global-collections
Browse files Browse the repository at this point in the history
Global credentials
  • Loading branch information
josephjclark authored Oct 22, 2024
2 parents 71edb8b + 4e659a8 commit 5b2b1f4
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/eleven-cougars-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

Support collections
5 changes: 5 additions & 0 deletions .changeset/healthy-laws-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/runtime': major
---

Support global credential object on a workflow
5 changes: 5 additions & 0 deletions .changeset/loud-crabs-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Append the collections adaptor to steps that need it
4 changes: 4 additions & 0 deletions packages/lexicon/core.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export type Workflow = {
name?: string;

steps: Array<Job | Trigger>;

// global credentials
// (gets applied to every configuration object)
credentials?: Record<string, any>;
};

/**
Expand Down
3 changes: 3 additions & 0 deletions packages/runtime/src/execute/compile-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ export default (plan: ExecutionPlan) => {
start: options.start ?? workflow.steps[0]?.id!,
},
};
if (workflow.credentials) {
newPlan.workflow.credentials = workflow.credentials;
}

const maybeAssign = (a: any, b: any, keys: Array<keyof Job>) => {
keys.forEach((key) => {
Expand Down
12 changes: 8 additions & 4 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const executeStep = async (
step: CompiledStep,
input: State = {}
): Promise<{ next: StepId[]; state: any }> => {
const { opts, notify, logger, report } = ctx;
const { opts, notify, logger, report, plan } = ctx;

const duration = Date.now();

Expand All @@ -104,12 +104,16 @@ const executeStep = async (
opts.callbacks?.resolveCredential! // cheat - we need to handle the error case here
);

const globals = await loadState(
const globalState = await loadState(
job,
opts.callbacks?.resolveState! // and here
);

const state = assembleState(clone(input), configuration, globals);
const state = assembleState(
clone(input),
configuration,
globalState,
plan.workflow?.credentials
);

notify(NOTIFY_INIT_COMPLETE, {
jobId,
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export type Lazy<T> = string | T;
export type CompiledExecutionPlan = {
workflow: {
steps: Record<StepId, CompiledStep>;
credentials?: Record<string, any>;
};
options: WorkflowOptions & {
start: StepId;
Expand Down
9 changes: 3 additions & 6 deletions packages/runtime/src/util/assemble-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const assembleData = (initialData: any, defaultData = {}) => {
const assembleState = (
initialState: any = {}, // previous or initial state
configuration = {},
defaultState: any = {} // This is default state provided by the job
defaultState: any = {}, // This is default state provided by the job
globalCredentials: any = {}
) => {
const obj = {
...defaultState,
Expand All @@ -29,11 +30,7 @@ const assembleState = (
}

Object.assign(obj, {
configuration: Object.assign(
{},
initialState.configuration ?? {},
configuration
),
configuration: Object.assign({}, globalCredentials, configuration),
data: assembleData(initialState.data, defaultState.data),
});

Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/test/execute/compile-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ const planWithEdge = (edge: Partial<StepEdge>) => ({
},
});

test('should preserve global credentials ', (t) => {
const compiledPlan = compilePlan({
id: 'a',
workflow: {
steps: [{ id: 'a', expression: 'a' }],
credentials: { collections_token: 'j.w.t.' },
},
});

t.deepEqual(compiledPlan.workflow.credentials, {
collections_token: 'j.w.t.',
});
});

test('should preserve the start option', (t) => {
const compiledPlan = compilePlan({
id: 'a',
Expand Down
54 changes: 34 additions & 20 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import {
} from '../src';
import run from '../src/runtime';

type ExecutionPlanNoOptions = Omit<ExecutionPlan, 'options'>;

test('run simple expression', async (t) => {
const expression = 'export default [(s) => {s.data.done = true; return s}]';

Expand All @@ -22,7 +20,7 @@ test('run simple expression', async (t) => {
});

test('run a simple workflow', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ expression: 'export default [(s) => ({ data: { done: true } })]' },
Expand All @@ -34,6 +32,22 @@ test('run a simple workflow', async (t) => {
t.true(result.data.done);
});

test('run a workflow with global config', async (t) => {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [(s) => state.configuration];' }],
credentials: {
collection_token: 'j.w.t',
},
},
};

const result: any = await run(plan);
t.deepEqual(result, {
collection_token: 'j.w.t',
});
});

test('run a workflow and notify major events', async (t) => {
const counts: Record<string, number> = {};
const notify = (name: string) => {
Expand All @@ -47,7 +61,7 @@ test('run a workflow and notify major events', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [(s) => s]' }],
},
Expand Down Expand Up @@ -76,7 +90,7 @@ test('notify job error even after fail', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ id: 'a', expression: 'export default [(s) => s.data.x = s.err.z ]' },
Expand All @@ -102,7 +116,7 @@ test('notify job error even after crash', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: { steps: [{ id: 'a', expression: 'export default [() => s]' }] },
};

Expand Down Expand Up @@ -139,7 +153,7 @@ test('resolve a credential', async (t) => {
});

test('resolve initial state', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -174,7 +188,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ id: 'a', expression: 'export default [(s) => s]', next: { b: true } },
Expand All @@ -192,7 +206,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => {
});

test('run a workflow with state and parallel branching', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -239,7 +253,7 @@ test('run a workflow with state and parallel branching', async (t) => {
});

test('run a workflow with a leaf step called multiple times', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -294,7 +308,7 @@ test('run a workflow with a leaf step called multiple times', async (t) => {
// TODO this test sort of shows why input state on the plan object is a bit funky
// running the same plan with two inputs is pretty clunky
test('run a workflow with state and conditional branching', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -422,7 +436,7 @@ test('run a workflow with a start and end', async (t) => {
});

test('run a workflow with a trigger node', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -442,7 +456,7 @@ test('run a workflow with a trigger node', async (t) => {
});

test('prefer initial state to inline state', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -464,7 +478,7 @@ test('prefer initial state to inline state', async (t) => {
});

test('Allow a job to return undefined', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [() => {}]' }],
},
Expand All @@ -475,7 +489,7 @@ test('Allow a job to return undefined', async (t) => {
});

test('log errors, write to state, and continue', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -507,7 +521,7 @@ test('log errors, write to state, and continue', async (t) => {
});

test('log job code to the job logger', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -529,7 +543,7 @@ test('log job code to the job logger', async (t) => {
});

test('log and serialize an error to the job logger', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -556,7 +570,7 @@ test('log and serialize an error to the job logger', async (t) => {
});

test('error reports can be overwritten', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -580,7 +594,7 @@ test('error reports can be overwritten', async (t) => {

// This tracks current behaviour but I don't know if it's right
test('stuff written to state before an error is preserved', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -609,7 +623,7 @@ test('data can be an array (expression)', async (t) => {
});

test('data can be an array (workflow)', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down
49 changes: 47 additions & 2 deletions packages/runtime/test/util/assemble-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ test('Initial data does not have to be an object', (t) => {
});
});

test('merges default and initial config objects', (t) => {
test('does not merge default and initial config objects', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { y: 1 };

const result = assembleState(initial, config, defaultState);
t.deepEqual(result, {
configuration: {
x: 1,
y: 1,
},
data: {},
Expand All @@ -95,3 +94,49 @@ test('configuration overrides initialState.configuration', (t) => {
data: {},
});
});

test('global credentials should be added', (t) => {
const initial = {};
const defaultState = undefined;
const config = undefined;
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
collection_token: 'j.w.t',
},
data: {},
});
});

test('global credentials should be merged in', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { x: 2 };
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
x: 2,
collection_token: 'j.w.t',
},
data: {},
});
});

test('local credentials should override global credentials', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { collection_token: 'x.y.z' };
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
collection_token: 'x.y.z',
},
data: {},
});
});
11 changes: 11 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# ws-worker

## 1.7.1

### Patch Changes

- 1c79dc1: Append the collections adaptor to steps that need it
- b15f151: Update worker to use adaptors as an array on xplans. Internal only change.
- Updated dependencies [3463ff9]
- Updated dependencies [7245bf7]
- @openfn/runtime@2.0.0
- @openfn/engine-multi@1.4.0

## 1.7.0

### Minor Changes
Expand Down
Loading

0 comments on commit 5b2b1f4

Please sign in to comment.