Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global credentials #803

Merged
merged 5 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eleven-cougars-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

Support collections
5 changes: 5 additions & 0 deletions .changeset/healthy-laws-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/runtime': major
---

Support global credential object on a workflow
5 changes: 5 additions & 0 deletions .changeset/loud-crabs-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Append the collections adaptor to steps that need it
4 changes: 4 additions & 0 deletions packages/lexicon/core.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export type Workflow = {
name?: string;

steps: Array<Job | Trigger>;

// global credentials
// (gets applied to every configuration object)
credentials?: Record<string, any>;
};

/**
Expand Down
3 changes: 3 additions & 0 deletions packages/runtime/src/execute/compile-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ export default (plan: ExecutionPlan) => {
start: options.start ?? workflow.steps[0]?.id!,
},
};
if (workflow.credentials) {
newPlan.workflow.credentials = workflow.credentials;
}

const maybeAssign = (a: any, b: any, keys: Array<keyof Job>) => {
keys.forEach((key) => {
Expand Down
12 changes: 8 additions & 4 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const executeStep = async (
step: CompiledStep,
input: State = {}
): Promise<{ next: StepId[]; state: any }> => {
const { opts, notify, logger, report } = ctx;
const { opts, notify, logger, report, plan } = ctx;

const duration = Date.now();

Expand All @@ -104,12 +104,16 @@ const executeStep = async (
opts.callbacks?.resolveCredential! // cheat - we need to handle the error case here
);

const globals = await loadState(
const globalState = await loadState(
job,
opts.callbacks?.resolveState! // and here
);

const state = assembleState(clone(input), configuration, globals);
const state = assembleState(
clone(input),
configuration,
globalState,
plan.workflow?.credentials
);

notify(NOTIFY_INIT_COMPLETE, {
jobId,
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export type Lazy<T> = string | T;
export type CompiledExecutionPlan = {
workflow: {
steps: Record<StepId, CompiledStep>;
credentials?: Record<string, any>;
};
options: WorkflowOptions & {
start: StepId;
Expand Down
9 changes: 3 additions & 6 deletions packages/runtime/src/util/assemble-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const assembleData = (initialData: any, defaultData = {}) => {
const assembleState = (
initialState: any = {}, // previous or initial state
configuration = {},
defaultState: any = {} // This is default state provided by the job
defaultState: any = {}, // This is default state provided by the job
globalCredentials: any = {}
) => {
const obj = {
...defaultState,
Expand All @@ -29,11 +30,7 @@ const assembleState = (
}

Object.assign(obj, {
configuration: Object.assign(
{},
initialState.configuration ?? {},
configuration
),
configuration: Object.assign({}, globalCredentials, configuration),
data: assembleData(initialState.data, defaultState.data),
});

Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/test/execute/compile-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ const planWithEdge = (edge: Partial<StepEdge>) => ({
},
});

test('should preserve global credentials ', (t) => {
const compiledPlan = compilePlan({
id: 'a',
workflow: {
steps: [{ id: 'a', expression: 'a' }],
credentials: { collections_token: 'j.w.t.' },
},
});

t.deepEqual(compiledPlan.workflow.credentials, {
collections_token: 'j.w.t.',
});
});

test('should preserve the start option', (t) => {
const compiledPlan = compilePlan({
id: 'a',
Expand Down
54 changes: 34 additions & 20 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import {
} from '../src';
import run from '../src/runtime';

type ExecutionPlanNoOptions = Omit<ExecutionPlan, 'options'>;

test('run simple expression', async (t) => {
const expression = 'export default [(s) => {s.data.done = true; return s}]';

Expand All @@ -22,7 +20,7 @@ test('run simple expression', async (t) => {
});

test('run a simple workflow', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ expression: 'export default [(s) => ({ data: { done: true } })]' },
Expand All @@ -34,6 +32,22 @@ test('run a simple workflow', async (t) => {
t.true(result.data.done);
});

test('run a workflow with global config', async (t) => {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [(s) => state.configuration];' }],
credentials: {
collection_token: 'j.w.t',
},
},
};

const result: any = await run(plan);
t.deepEqual(result, {
collection_token: 'j.w.t',
});
});

test('run a workflow and notify major events', async (t) => {
const counts: Record<string, number> = {};
const notify = (name: string) => {
Expand All @@ -47,7 +61,7 @@ test('run a workflow and notify major events', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [(s) => s]' }],
},
Expand Down Expand Up @@ -76,7 +90,7 @@ test('notify job error even after fail', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ id: 'a', expression: 'export default [(s) => s.data.x = s.err.z ]' },
Expand All @@ -102,7 +116,7 @@ test('notify job error even after crash', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: { steps: [{ id: 'a', expression: 'export default [() => s]' }] },
};

Expand Down Expand Up @@ -139,7 +153,7 @@ test('resolve a credential', async (t) => {
});

test('resolve initial state', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -174,7 +188,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => {
notify,
};

const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{ id: 'a', expression: 'export default [(s) => s]', next: { b: true } },
Expand All @@ -192,7 +206,7 @@ test('run a workflow with two jobs and call callbacks', async (t) => {
});

test('run a workflow with state and parallel branching', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -239,7 +253,7 @@ test('run a workflow with state and parallel branching', async (t) => {
});

test('run a workflow with a leaf step called multiple times', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -294,7 +308,7 @@ test('run a workflow with a leaf step called multiple times', async (t) => {
// TODO this test sort of shows why input state on the plan object is a bit funky
// running the same plan with two inputs is pretty clunky
test('run a workflow with state and conditional branching', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -422,7 +436,7 @@ test('run a workflow with a start and end', async (t) => {
});

test('run a workflow with a trigger node', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -442,7 +456,7 @@ test('run a workflow with a trigger node', async (t) => {
});

test('prefer initial state to inline state', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -464,7 +478,7 @@ test('prefer initial state to inline state', async (t) => {
});

test('Allow a job to return undefined', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [{ expression: 'export default [() => {}]' }],
},
Expand All @@ -475,7 +489,7 @@ test('Allow a job to return undefined', async (t) => {
});

test('log errors, write to state, and continue', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -507,7 +521,7 @@ test('log errors, write to state, and continue', async (t) => {
});

test('log job code to the job logger', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -529,7 +543,7 @@ test('log job code to the job logger', async (t) => {
});

test('log and serialize an error to the job logger', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -556,7 +570,7 @@ test('log and serialize an error to the job logger', async (t) => {
});

test('error reports can be overwritten', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand All @@ -580,7 +594,7 @@ test('error reports can be overwritten', async (t) => {

// This tracks current behaviour but I don't know if it's right
test('stuff written to state before an error is preserved', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down Expand Up @@ -609,7 +623,7 @@ test('data can be an array (expression)', async (t) => {
});

test('data can be an array (workflow)', async (t) => {
const plan: ExecutionPlanNoOptions = {
const plan: ExecutionPlan = {
workflow: {
steps: [
{
Expand Down
49 changes: 47 additions & 2 deletions packages/runtime/test/util/assemble-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ test('Initial data does not have to be an object', (t) => {
});
});

test('merges default and initial config objects', (t) => {
test('does not merge default and initial config objects', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { y: 1 };

const result = assembleState(initial, config, defaultState);
t.deepEqual(result, {
configuration: {
x: 1,
y: 1,
},
data: {},
Expand All @@ -95,3 +94,49 @@ test('configuration overrides initialState.configuration', (t) => {
data: {},
});
});

test('global credentials should be added', (t) => {
const initial = {};
const defaultState = undefined;
const config = undefined;
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
collection_token: 'j.w.t',
},
data: {},
});
});

test('global credentials should be merged in', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { x: 2 };
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
x: 2,
collection_token: 'j.w.t',
},
data: {},
});
});

test('local credentials should override global credentials', (t) => {
const initial = { configuration: { x: 1 } };
const defaultState = undefined;
const config = { collection_token: 'x.y.z' };
const global = { collection_token: 'j.w.t' };

const result = assembleState(initial, config, defaultState, global);
t.deepEqual(result, {
configuration: {
collection_token: 'x.y.z',
},
data: {},
});
});
11 changes: 11 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# ws-worker

## 1.7.1

### Patch Changes

- 1c79dc1: Append the collections adaptor to steps that need it
- b15f151: Update worker to use adaptors as an array on xplans. Internal only change.
- Updated dependencies [3463ff9]
- Updated dependencies [7245bf7]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.7.0

### Minor Changes
Expand Down
Loading