From a2349e8811956d8c84576a2f4c998abed2934ecf Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 26 Sep 2024 13:14:42 +0100 Subject: [PATCH] worker: tests on backoff --- packages/ws-worker/test/lightning.test.ts | 108 ++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index f42edef77..42ed02230 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -46,6 +46,10 @@ test.before(async () => { // Note that if this is not passed, // JWT verification will be skipped runPublicKey: keys.public, + backoff: { + min: 1, + max: 1000, + } }); }); @@ -106,6 +110,110 @@ test.serial( } ); +test.serial( + `should not claim while at capacity, then resume`, + (t) => { + return new Promise((done) => { + + let runIsActive = false; + let runComplete = false; + let didClaimAfterComplete = false; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + if (runIsActive) { + t.fail('Claimed while run is active') + } + if (runComplete) { + didClaimAfterComplete = true; + } + }); + + lng.onSocketEvent(e.RUN_START, run.id, () => { + runIsActive = true; + }) + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + runIsActive = false; + runComplete = true; + + setTimeout(() => { + t.true(didClaimAfterComplete); + done() + }, 10) + }); + + lng.enqueueRun(run); + }); + } +); + +test.serial( + `should reset backoff after claim`, + (t) => { + return new Promise((done) => { + + let lastClaim = Date.now() + let lastClaimDiff = 0; + + const run = { + id: `a${++rollingRunId}`, + jobs: [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: `fn(() => new Promise((resolve) => { + setTimeout(resolve, 500) + }))`, + }, + ], + }; + + + lng.on(e.CLAIM, () => { + lastClaimDiff = Date.now() - lastClaim; + lastClaim = Date.now() + }); + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + // set this articially high - if there are no more claims, the test will fail + lastClaimDiff = 10000; + + // When the run is finished, the claims should resume + // but with a smaller backoff + setTimeout(() => { + t.log('Backoff after run:', lastClaimDiff) + t.true(lastClaimDiff < 5) + done() + }, 10) + }); + + + setTimeout(() => { + t.log('Backoff before run:', lastClaimDiff) + // let the backoff increase a bit + // the last claim diff should be at least 30ms + t.true(lastClaimDiff > 30) + + lng.enqueueRun(run); + }, 600) + }); + } +); + test.todo('worker should log when a run token is verified'); // Perhaps a workflow exception is the most responsible thing right now