From fd0e499aa0328eed5098b8d8fbc740baa428cc64 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 22 Oct 2024 17:10:17 +0100 Subject: [PATCH] worker: create global credential for collections --- .changeset/eleven-cougars-exist.md | 5 ++++ packages/ws-worker/CHANGELOG.md | 11 +++++++++ packages/ws-worker/package.json | 2 +- packages/ws-worker/src/server.ts | 9 ++++++++ .../src/util/convert-lightning-plan.ts | 11 ++++++++- packages/ws-worker/test/lightning.test.ts | 23 +++++++++++++++++++ .../test/util/convert-lightning-plan.test.ts | 23 +++++++++++++++++++ 7 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 .changeset/eleven-cougars-exist.md diff --git a/.changeset/eleven-cougars-exist.md b/.changeset/eleven-cougars-exist.md new file mode 100644 index 000000000..53e2817d7 --- /dev/null +++ b/.changeset/eleven-cougars-exist.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': minor +--- + +Support collections diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index cf78dec30..12f3005bd 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,16 @@ # ws-worker +## 1.7.1 + +### Patch Changes + +- 1c79dc1: Append the collections adaptor to steps that need it +- b15f151: Update worker to use adaptors as an array on xplans. Internal only change. +- Updated dependencies [3463ff9] +- Updated dependencies [7245bf7] + - @openfn/runtime@2.0.0 + - @openfn/engine-multi@1.4.0 + ## 1.7.0 ### Minor Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index c10dac7ce..cd0c2a8f3 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.7.0", + "version": "1.7.1", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index c77fb755c..d9562510f 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -202,6 +202,15 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { input, } = await joinRunChannel(app.socket, token, id, logger); + // Setup collections + if (plan.workflow.credentials?.collections_token) { + plan.workflow.credentials.collections_token = token; + } + if (plan.workflow.credentials?.collections_endpoint) { + plan.workflow.credentials.collections_endpoint = + app.options.lightning; + } + // Default the payload limit if it's not otherwise set on the run options if (!('payloadLimitMb' in options)) { options.payloadLimitMb = app.options.payloadLimitMb; diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 49e090276..01eeddcac 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -42,13 +42,16 @@ const mapTriggerEdgeCondition = (edge: LightningEdge) => { // This function will look at every step and decide whether the collections adaptor // should be added to the array const appendCollectionsAdaptor = (plan: ExecutionPlan) => { + let hasCollections; plan.workflow.steps.forEach((step) => { const job = step as Job; if (job.expression?.match(/(collections\.)/)) { + hasCollections = true; job.adaptors ??= []; job.adaptors.push('@openfn/language-collections'); // what about version? Is this safe? } }); + return hasCollections; }; // Options which relate to this execution but are not part of the plan @@ -182,7 +185,13 @@ export default ( plan.workflow.name = run.name; } - appendCollectionsAdaptor(plan as ExecutionPlan); + const hasCollections = appendCollectionsAdaptor(plan as ExecutionPlan); + if (hasCollections) { + plan.workflow.credentials = { + collections_token: true, + collections_endpoint: 'https://app.openfn.org', + }; + } return { plan: plan as ExecutionPlan, diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 4d9b08924..4b508bcce 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -258,6 +258,29 @@ test.serial('should run a run which returns initial state', async (t) => { }); }); +test.serial('should run a run with the collections adaptor', async (t) => { + return new Promise((done) => { + const run = { + id: 'run-1', + jobs: [ + { + // This should be enough to fake the worker into + // loading the collections machinery + body: 'fn((s) => /* collections.get */ s.configuration)', + }, + ], + }; + + lng.waitForResult(run.id).then((result: any) => { + t.is(result.collections_endpoint, urls.lng); + t.is(typeof result.collections_token, 'string'); + done(); + }); + + lng.enqueueRun(run); + }); +}); + // A basic high level integration test to ensure the whole loop works // This checks the events received by the lightning websocket test.serial( diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index e7922c1eb..9f55f6a57 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -609,3 +609,26 @@ test('append the collections adaptor to jobs that use it', (t) => { t.deepEqual(a.adaptors, ['common']); t.deepEqual(b.adaptors, ['common', '@openfn/language-collections']); }); + +test('append the collections credential to jobs that use it', (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ id: 'a' }), + createNode({ + id: 'b', + body: 'collections.each("c", "k", (state) => state)', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('a', 'b')], + }; + const { plan } = convertPlan(run as LightningPlan); + + const creds = plan.workflow.credentials; + + t.deepEqual(creds, { + collections_token: true, + collections_endpoint: 'https://app.openfn.org', + }); +});