Skip to content

Commit

Permalink
worker: create global credential for collections
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Oct 22, 2024
1 parent c6d8ac9 commit fd0e499
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/eleven-cougars-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

Support collections
11 changes: 11 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# ws-worker

## 1.7.1

### Patch Changes

- 1c79dc1: Append the collections adaptor to steps that need it
- b15f151: Update worker to use adaptors as an array on xplans. Internal only change.
- Updated dependencies [3463ff9]
- Updated dependencies [7245bf7]
- @openfn/runtime@2.0.0
- @openfn/engine-multi@1.4.0

## 1.7.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.7.0",
"version": "1.7.1",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
9 changes: 9 additions & 0 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Setup collections
if (plan.workflow.credentials?.collections_token) {
plan.workflow.credentials.collections_token = token;
}
if (plan.workflow.credentials?.collections_endpoint) {
plan.workflow.credentials.collections_endpoint =
app.options.lightning;
}

// Default the payload limit if it's not otherwise set on the run options
if (!('payloadLimitMb' in options)) {
options.payloadLimitMb = app.options.payloadLimitMb;
Expand Down
11 changes: 10 additions & 1 deletion packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ const mapTriggerEdgeCondition = (edge: LightningEdge) => {
// This function will look at every step and decide whether the collections adaptor
// should be added to the array
const appendCollectionsAdaptor = (plan: ExecutionPlan) => {
let hasCollections;
plan.workflow.steps.forEach((step) => {
const job = step as Job;
if (job.expression?.match(/(collections\.)/)) {
hasCollections = true;
job.adaptors ??= [];
job.adaptors.push('@openfn/language-collections'); // what about version? Is this safe?
}
});
return hasCollections;
};

// Options which relate to this execution but are not part of the plan
Expand Down Expand Up @@ -182,7 +185,13 @@ export default (
plan.workflow.name = run.name;
}

appendCollectionsAdaptor(plan as ExecutionPlan);
const hasCollections = appendCollectionsAdaptor(plan as ExecutionPlan);
if (hasCollections) {
plan.workflow.credentials = {
collections_token: true,
collections_endpoint: 'https://app.openfn.org',
};
}

return {
plan: plan as ExecutionPlan,
Expand Down
23 changes: 23 additions & 0 deletions packages/ws-worker/test/lightning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,29 @@ test.serial('should run a run which returns initial state', async (t) => {
});
});

test.serial('should run a run with the collections adaptor', async (t) => {
return new Promise((done) => {
const run = {
id: 'run-1',
jobs: [
{
// This should be enough to fake the worker into
// loading the collections machinery
body: 'fn((s) => /* collections.get */ s.configuration)',
},
],
};

lng.waitForResult(run.id).then((result: any) => {
t.is(result.collections_endpoint, urls.lng);
t.is(typeof result.collections_token, 'string');
done();
});

lng.enqueueRun(run);
});
});

// A basic high level integration test to ensure the whole loop works
// This checks the events received by the lightning websocket
test.serial(
Expand Down
23 changes: 23 additions & 0 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,26 @@ test('append the collections adaptor to jobs that use it', (t) => {
t.deepEqual(a.adaptors, ['common']);
t.deepEqual(b.adaptors, ['common', '@openfn/language-collections']);
});

test('append the collections credential to jobs that use it', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
jobs: [
createNode({ id: 'a' }),
createNode({
id: 'b',
body: 'collections.each("c", "k", (state) => state)',
}),
],
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('a', 'b')],
};
const { plan } = convertPlan(run as LightningPlan);

const creds = plan.workflow.credentials;

t.deepEqual(creds, {
collections_token: true,
collections_endpoint: 'https://app.openfn.org',
});
});

0 comments on commit fd0e499

Please sign in to comment.