From 40f129baca1d46cddbbb69987d65089b132eca80 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 10:27:15 +0000 Subject: [PATCH 01/22] runtime: add tests --- packages/runtime/test/execute/plan.test.ts | 69 ++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index ebfdd5da6..01a142ba8 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -634,6 +634,75 @@ test('execute multiple steps in "parallel"', async (t) => { }); }); +test('isolate state in "parallel" execution', async (t) => { + const plan: ExecutionPlan = { + start: 'start', + initialState: { data: { x: 0 } }, + jobs: [ + { + id: 'start', + expression: 'export default [s => s]', + next: { + b: true, + c: true, + }, + }, + { + id: 'b', + expression: + 'export default [s => { if (s.data.c) { throw "e" }; s.data.b = true; return s }]', + }, + { + id: 'c', + expression: + 'export default [s => { if (s.data.b) { throw "e" }; s.data.c = true; return s }]', + }, + ], + }; + + const result = await execute(plan, {}, mockLogger); + t.falsy(result.errors); +}); + +test('"parallel" execution with multiple leaves should write multiple results to state', async (t) => { + const plan: ExecutionPlan = { + start: 'start', + jobs: [ + { + id: 'start', + expression: 'export default [s => s]', + next: { + 'job-b': true, + 'job-c': true, + }, + }, + { + id: 'job-b', + expression: 'export default [s => { s.data.b = true; return s }]', + }, + { + id: 'job-c', + expression: 'export default [s => { s.data.c = true; return s }]', + }, + ], + }; + + const result = await execute(plan, {}, mockLogger); + // Each leaf should write to its own place on state + t.deepEqual(result, { + 'job-b': { + data: { + b: true, + }, + }, + 'job-c': { + data: { + c: true, + }, + }, + }); +}); + test('return an error in state', async (t) => { const plan: ExecutionPlan = { jobs: [ From e173493c736da92c670217f4471645c44692b7b1 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 10:36:11 +0000 Subject: [PATCH 02/22] runtime: more tests --- packages/runtime/test/execute/plan.test.ts | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index 01a142ba8..1cdd96682 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -664,6 +664,49 @@ test('isolate state in "parallel" execution', async (t) => { t.falsy(result.errors); }); +test('isolate state in "parallel" execution with deeper trees', async (t) => { + const plan: ExecutionPlan = { + start: 'start', + initialState: { data: { x: 0 } }, + jobs: [ + { + id: 'start', + expression: 'export default [s => s]', + next: { + // fudge the order a bit + c: true, + b: true, + }, + }, + { + id: 'c2', + expression: + 'export default [s => { if (s.data.b) { throw "e" }; s.data.c = true; return s }]', + }, + { + id: 'b', + expression: + 'export default [s => { if (s.data.c) { throw "e" }; s.data.b = true; return s }]', + next: { b2: true }, + }, + { + id: 'c', + expression: + 'export default [s => { if (s.data.b) { throw "e" }; s.data.c = true; return s }]', + next: { c2: true }, + }, + { + id: 'b2', + expression: + 'export default [s => { if (s.data.c) { throw "e" }; s.data.b = true; return s }]', + }, + ], + }; + + const result = await execute(plan, {}, mockLogger); + t.falsy(result.errors); +}); + test('"parallel" execution with multiple leaves should write multiple results to state', async (t) => { const plan: ExecutionPlan = { start: 'start', From fd375f6ff5d30efe7056d5550ca41240c5a32516 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 11:43:14 +0000 Subject: [PATCH 03/22] runtime: move job start/complete notification into job.ts --- packages/runtime/src/execute/expression.ts | 13 +-- packages/runtime/src/execute/job.ts | 18 +++- .../runtime/test/execute/expression.test.ts | 66 +------------ packages/runtime/test/execute/job.test.ts | 93 +++++++++++++++++++ 4 files changed, 111 insertions(+), 79 deletions(-) create mode 100644 packages/runtime/test/execute/job.test.ts diff --git a/packages/runtime/src/execute/expression.ts b/packages/runtime/src/execute/expression.ts index 3163897a6..b280fe0d8 100644 --- a/packages/runtime/src/execute/expression.ts +++ b/packages/runtime/src/execute/expression.ts @@ -16,7 +16,6 @@ import { assertRuntimeError, assertSecurityKill, } from '../errors'; -import { NOTIFY_JOB_COMPLETE, NOTIFY_JOB_START } from '../events'; export type ExecutionErrorWrapper = { state: any; @@ -26,12 +25,11 @@ export type ExecutionErrorWrapper = { export default ( ctx: ExecutionContext, expression: string | Operation[], - initialState: State, - id?: string + initialState: State ) => new Promise(async (resolve, reject) => { let duration = Date.now(); - const { logger, notify = () => {}, opts = {} } = ctx; + const { logger, opts = {} } = ctx; try { const timeout = opts.timeout || TIMEOUT; @@ -55,7 +53,6 @@ export default ( // Run the pipeline logger.debug(`Executing expression (${operations.length} operations)`); - notify(NOTIFY_JOB_START, { jobId: id }); const tid = setTimeout(() => { logger.error(`Error: Timeout (${timeout}ms) expired!`); @@ -73,12 +70,6 @@ export default ( const finalState = prepareFinalState(opts, result); - notify(NOTIFY_JOB_COMPLETE, { - duration, - state: finalState, - jobId: id, - }); - // return the final state resolve(finalState); } catch (e: any) { diff --git a/packages/runtime/src/execute/job.ts b/packages/runtime/src/execute/job.ts index 2ef6617a6..b95726140 100644 --- a/packages/runtime/src/execute/job.ts +++ b/packages/runtime/src/execute/job.ts @@ -14,7 +14,9 @@ import { EdgeConditionError } from '../errors'; import { NOTIFY_INIT_COMPLETE, NOTIFY_INIT_START, + NOTIFY_JOB_COMPLETE, NOTIFY_JOB_ERROR, + NOTIFY_JOB_START, } from '../events'; const loadCredentials = async ( @@ -87,9 +89,19 @@ const executeJob = async ( if (job.expression) { const startTime = Date.now(); try { - result = await executeExpression(ctx, job.expression, state, job.id); - const duration = logger.timer('job'); - logger.success(`Completed job ${job.id} in ${duration}`); + // TODO include the upstream job + notify(NOTIFY_JOB_START, { jobId: job.id }); + result = await executeExpression(ctx, job.expression, state); + const humanDuration = logger.timer('job'); + logger.success(`Completed job ${job.id} in ${humanDuration}`); + + // TODO should we also include the downstream jobs here? + // That's a little bit more complicated to work out + notify(NOTIFY_JOB_COMPLETE, { + duration, + state: result, + jobId: job.id, + }); } catch (e: any) { if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) { const { error, state } = e as ExecutionErrorWrapper; diff --git a/packages/runtime/test/execute/expression.test.ts b/packages/runtime/test/execute/expression.test.ts index a282ee974..8cb92d7aa 100644 --- a/packages/runtime/test/execute/expression.test.ts +++ b/packages/runtime/test/execute/expression.test.ts @@ -1,9 +1,8 @@ import test from 'ava'; import { fn } from '@openfn/language-common'; -import type { State, Operation, ExecutionContext } from '../../src/types'; import { createMockLogger } from '@openfn/logger'; import execute from '../../src/execute/expression'; -import { NOTIFY_JOB_COMPLETE, NOTIFY_JOB_START } from '../../src'; +import type { State, Operation, ExecutionContext } from '../../src/types'; type TestState = State & { data: { @@ -67,69 +66,6 @@ test('run a live no-op job with @openfn/language-common.fn', async (t) => { t.deepEqual(state, result); }); -test(`notify ${NOTIFY_JOB_START}`, async (t) => { - let didCallCallback = false; - - const expression = [(s: State) => s]; - const state = createState(); - - const notify = (event: string, payload?: any) => { - if (event === NOTIFY_JOB_START) { - didCallCallback = true; - } - t.is(payload.jobId, 'j'); - }; - - const context = createContext({ notify }); - - await execute(context, expression, state, 'j'); - t.true(didCallCallback); -}); - -test(`notify ${NOTIFY_JOB_COMPLETE}`, async (t) => { - let didCallCallback = false; - - const expression = [(s: State) => s]; - const state = createState(); - - const notify = (event: string, payload: any) => { - if (event === NOTIFY_JOB_COMPLETE) { - const { state, duration, jobId } = payload; - didCallCallback = true; - t.truthy(state); - t.deepEqual(state, state); - t.assert(!isNaN(duration)); - t.is(jobId, 'j'); - } - }; - - const context = createContext({ notify }); - - await execute(context, expression, state, 'j'); - t.true(didCallCallback); -}); - -test(`notify ${NOTIFY_JOB_COMPLETE} should publish serializable state`, async (t) => { - // Promises will trigger an exception if you try to serialize them - // If we don't return finalState in execute/expression, this test will fail - const resultState = { x: new Promise((r) => r), y: 22 }; - const expression = [(s: State) => resultState]; - const state = createState(); - - const notify = (event: string, payload: any) => { - if (event === NOTIFY_JOB_COMPLETE) { - const { state, duration, jobId } = payload; - t.truthy(state); - t.assert(!isNaN(duration)); - t.is(jobId, 'j'); - } - }; - - const context = createContext({ notify }); - - await execute(context, expression, state, 'j'); -}); - test('jobs can handle a promise', async (t) => { const job = [async (s: State) => s]; const state = createState(); diff --git a/packages/runtime/test/execute/job.test.ts b/packages/runtime/test/execute/job.test.ts new file mode 100644 index 000000000..e8d39d71c --- /dev/null +++ b/packages/runtime/test/execute/job.test.ts @@ -0,0 +1,93 @@ +import test from 'ava'; +import { createMockLogger } from '@openfn/logger'; + +import { NOTIFY_JOB_COMPLETE, NOTIFY_JOB_START } from '../../src'; +import execute from '../../src/execute/job'; + +import type { ExecutionContext, State } from '../../src/types'; + +const createState = (data = {}) => ({ + data: data, + configuration: {}, +}); + +const logger = createMockLogger(undefined, { level: 'debug' }); + +const createContext = (args = {}) => + ({ + logger, + plan: {}, + opts: {}, + notify: () => {}, + report: () => {}, + ...args, + } as unknown as ExecutionContext); + +test.afterEach(() => { + logger._reset(); +}); + +test(`notify ${NOTIFY_JOB_START}`, async (t) => { + const job = { + id: 'j', + expression: [(s: State) => s], + }; + const state = createState(); + + const notify = (event: string, payload?: any) => { + if (event === NOTIFY_JOB_START) { + t.is(payload.jobId, 'j'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); +}); + +test(`notify ${NOTIFY_JOB_COMPLETE}`, async (t) => { + const job = { + id: 'j', + expression: [(s: State) => s], + }; + + const state = createState(); + + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_COMPLETE) { + const { state, duration, jobId } = payload; + t.truthy(state); + t.deepEqual(state, state); + t.assert(!isNaN(duration)); + t.is(jobId, 'j'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); +}); + +test(`notify ${NOTIFY_JOB_COMPLETE} should publish serializable state`, async (t) => { + // Promises will trigger an exception if you try to serialize them + // If we don't return finalState in execute/expression, this test will fail + const resultState = { x: new Promise((r) => r), y: 22 }; + const job = { + id: 'j', + expression: [() => resultState], + }; + const state = createState(); + + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_COMPLETE) { + const { state, duration, jobId } = payload; + t.truthy(state); + t.assert(!isNaN(duration)); + t.is(jobId, 'j'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); +}); From 0be8f93da005e0cea4a6a4d40333781bc1b0736b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 12:45:50 +0000 Subject: [PATCH 04/22] runtime: publish next steps with job-complete --- packages/runtime/src/execute/job.ts | 95 ++++++++++++---------- packages/runtime/src/execute/plan.ts | 2 +- packages/runtime/src/types.ts | 51 +++++++++++- packages/runtime/test/execute/job.test.ts | 72 +++++++++++++++- packages/runtime/test/execute/plan.test.ts | 2 +- 5 files changed, 174 insertions(+), 48 deletions(-) diff --git a/packages/runtime/src/execute/job.ts b/packages/runtime/src/execute/job.ts index b95726140..630ad55ca 100644 --- a/packages/runtime/src/execute/job.ts +++ b/packages/runtime/src/execute/job.ts @@ -43,6 +43,35 @@ const loadState = async ( return job.state; }; +const calculateNext = (job: CompiledJobNode, result: any) => { + const next: string[] = []; + if (job.next) { + for (const nextJobId in job.next) { + const edge = job.next[nextJobId]; + if (!edge) { + continue; + } + if (typeof edge === 'object') { + if (edge.disabled || !edge.condition) { + continue; + } + if (typeof edge.condition === 'function') { + try { + if (!edge.condition(result)) { + continue; + } + } catch (e: any) { + throw new EdgeConditionError(e.message); + } + } + } + next.push(nextJobId); + // TODO errors + } + } + return next; +}; + // The job handler is responsible for preparing the job // and working out where to go next // it'll resolve credentials and state and notify how long init took @@ -51,13 +80,13 @@ const executeJob = async ( job: CompiledJobNode, initialState: State = {} ): Promise<{ next: JobNodeID[]; state: any }> => { - const next: string[] = []; - const { opts, notify, logger, report } = ctx; const duration = Date.now(); - notify(NOTIFY_INIT_START); + const jobId = job.id; + + notify(NOTIFY_INIT_START, { jobId }); // lazy load config and state const configuration = await loadCredentials( @@ -77,32 +106,27 @@ const executeJob = async ( opts.strict ); - notify(NOTIFY_INIT_COMPLETE, { duration: Date.now() - duration }); + notify(NOTIFY_INIT_COMPLETE, { jobId, duration: Date.now() - duration }); // We should by this point have validated the plan, so the job MUST exist logger.timer('job'); - logger.always('Starting job', job.id); + logger.always('Starting job', jobId); // The expression SHOULD return state, but COULD return anything let result: any = state; + let next: string[] = []; + let didError = false; if (job.expression) { const startTime = Date.now(); try { // TODO include the upstream job - notify(NOTIFY_JOB_START, { jobId: job.id }); + notify(NOTIFY_JOB_START, { jobId }); result = await executeExpression(ctx, job.expression, state); const humanDuration = logger.timer('job'); - logger.success(`Completed job ${job.id} in ${humanDuration}`); - - // TODO should we also include the downstream jobs here? - // That's a little bit more complicated to work out - notify(NOTIFY_JOB_COMPLETE, { - duration, - state: result, - jobId: job.id, - }); + logger.success(`Completed job ${jobId} in ${humanDuration}`); } catch (e: any) { + didError = true; if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) { const { error, state } = e as ExecutionErrorWrapper; @@ -110,14 +134,17 @@ const executeJob = async ( result = state; const duration = logger.timer('job'); - logger.error(`Failed job ${job.id} after ${duration}`); - report(state, job.id, error); + logger.error(`Failed job ${jobId} after ${duration}`); + report(state, jobId, error); + + next = calculateNext(job, result); notify(NOTIFY_JOB_ERROR, { duration: Date.now() - startTime, error, state, - jobId: job.id, + jobId, + next, }); // Stop executing if the error is sufficiently severe @@ -131,30 +158,16 @@ const executeJob = async ( } } - if (job.next) { - for (const nextJobId in job.next) { - const edge = job.next[nextJobId]; - if (!edge) { - continue; - } - if (typeof edge === 'object') { - if (edge.disabled || !edge.condition) { - continue; - } - if (typeof edge.condition === 'function') { - try { - if (!edge.condition(result)) { - continue; - } - } catch (e: any) { - throw new EdgeConditionError(e.message); - } - } - } - next.push(nextJobId); - // TODO errors - } + if (!didError) { + next = calculateNext(job, result); + notify(NOTIFY_JOB_COMPLETE, { + duration: Date.now() - duration, + state: result, + jobId, + next, + }); } + return { next, state: result }; }; diff --git a/packages/runtime/src/execute/plan.ts b/packages/runtime/src/execute/plan.ts index f8c08a013..b4085d2e3 100644 --- a/packages/runtime/src/execute/plan.ts +++ b/packages/runtime/src/execute/plan.ts @@ -49,7 +49,7 @@ const executePlan = async ( initialState = await opts.callbacks?.resolveState?.(id); const duration = Date.now() - startTime; - opts.callbacks?.notify?.(NOTIFY_STATE_LOAD, { duration, id }); + opts.callbacks?.notify?.(NOTIFY_STATE_LOAD, { duration, jobId: id }); logger.success(`loaded state for ${id} in ${duration}ms`); // TODO catch and re-throw diff --git a/packages/runtime/src/types.ts b/packages/runtime/src/types.ts index ade6063e0..e3bbb52b6 100644 --- a/packages/runtime/src/types.ts +++ b/packages/runtime/src/types.ts @@ -114,13 +114,18 @@ export type JobModule = { // TODO lifecycle hooks }; +type NotifyHandler = ( + event: NotifyEvents, + payload: NotifyEventsLookup[typeof event] +) => void; + // TODO difficulty: this is not the same as a vm execution context export type ExecutionContext = { plan: CompiledExecutionPlan; logger: Logger; opts: Options; report: ErrorReporter; - notify: (evt: NotifyEvents, payload?: any) => void; + notify: NotifyHandler; }; export type NotifyEvents = @@ -131,8 +136,50 @@ export type NotifyEvents = | typeof NOTIFY_JOB_ERROR | typeof NOTIFY_STATE_LOAD; +export type NotifyJobInitStartPayload = { + jobId: string; +}; + +export type NotifyJobInitCompletePayload = { + duration: number; + jobId: string; +}; + +export type NotifyJobCompletePayload = { + duration: number; + state: any; + jobId: string; + next: string[]; +}; + +export type NotifyJobErrorPayload = { + duration: number; + error?: any; // TODO I should be able to do better than this because I have a standard error interface + state: any; + jobId: string; + next: string[]; +}; + +export type NotifyJobStartPayload = { + jobId: string; +}; + +export type NotifyStateLoadPayload = { + jobId: string; + duration: number; +}; + +export type NotifyEventsLookup = { + [NOTIFY_INIT_START]: NotifyJobInitStartPayload; + [NOTIFY_INIT_COMPLETE]: NotifyJobInitCompletePayload; + [NOTIFY_JOB_START]: NotifyJobStartPayload; + [NOTIFY_JOB_COMPLETE]: NotifyJobCompletePayload; + [NOTIFY_JOB_ERROR]: NotifyJobErrorPayload; + [NOTIFY_STATE_LOAD]: NotifyStateLoadPayload; +}; + export type ExecutionCallbacks = { - notify?(event: NotifyEvents, payload: any): void; + notify: NotifyHandler; resolveState?: (stateId: string) => Promise; resolveCredential?: (credentialId: string) => Promise; }; diff --git a/packages/runtime/test/execute/job.test.ts b/packages/runtime/test/execute/job.test.ts index e8d39d71c..0522e19bc 100644 --- a/packages/runtime/test/execute/job.test.ts +++ b/packages/runtime/test/execute/job.test.ts @@ -1,7 +1,11 @@ import test from 'ava'; import { createMockLogger } from '@openfn/logger'; -import { NOTIFY_JOB_COMPLETE, NOTIFY_JOB_START } from '../../src'; +import { + NOTIFY_JOB_COMPLETE, + NOTIFY_JOB_ERROR, + NOTIFY_JOB_START, +} from '../../src'; import execute from '../../src/execute/job'; import type { ExecutionContext, State } from '../../src/types'; @@ -45,7 +49,7 @@ test(`notify ${NOTIFY_JOB_START}`, async (t) => { await execute(context, job, state); }); -test(`notify ${NOTIFY_JOB_COMPLETE}`, async (t) => { +test(`notify ${NOTIFY_JOB_COMPLETE} with no next`, async (t) => { const job = { id: 'j', expression: [(s: State) => s], @@ -55,10 +59,38 @@ test(`notify ${NOTIFY_JOB_COMPLETE}`, async (t) => { const notify = (event: string, payload: any) => { if (event === NOTIFY_JOB_COMPLETE) { - const { state, duration, jobId } = payload; + const { state, duration, jobId, next } = payload; + t.truthy(state); + t.deepEqual(state, state); + t.deepEqual(next, []); + t.assert(!isNaN(duration)); + t.true(duration < 10); + t.is(jobId, 'j'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); +}); + +test(`notify ${NOTIFY_JOB_COMPLETE} with two nexts`, async (t) => { + const job = { + id: 'j', + expression: [(s: State) => s], + next: { b: true, c: true }, + }; + + const state = createState(); + + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_COMPLETE) { + const { state, duration, jobId, next } = payload; t.truthy(state); t.deepEqual(state, state); + t.deepEqual(next, ['b', 'c']); t.assert(!isNaN(duration)); + t.true(duration < 10); t.is(jobId, 'j'); } }; @@ -91,3 +123,37 @@ test(`notify ${NOTIFY_JOB_COMPLETE} should publish serializable state`, async (t await execute(context, job, state); }); + +test(`notify ${NOTIFY_JOB_ERROR} for a fail`, async (t) => { + const job = { + id: 'j', + expression: [ + () => { + throw 'e'; + }, + ], + next: { b: true }, + }; + + const state = createState(); + + const notify = (event: string, payload: any) => { + if (event === NOTIFY_JOB_ERROR) { + const { state, duration, jobId, next, error } = payload; + t.truthy(state); + t.is(error.message, 'e'); + t.is(error.type, 'JobError'); + t.is(error.severity, 'fail'); + + t.deepEqual(state, state); + t.deepEqual(next, ['b']); + t.assert(!isNaN(duration)); + t.true(duration < 10); + t.is(jobId, 'j'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); +}); diff --git a/packages/runtime/test/execute/plan.test.ts b/packages/runtime/test/execute/plan.test.ts index 1cdd96682..9f058e5f6 100644 --- a/packages/runtime/test/execute/plan.test.ts +++ b/packages/runtime/test/execute/plan.test.ts @@ -801,7 +801,7 @@ test('keep executing after an error', async (t) => { t.falsy(result.x); }); -test('simple on-error handler', async (t) => { +test.only('simple on-error handler', async (t) => { const plan: ExecutionPlan = { jobs: [ { From 7f352d2769f86361c096cd490eaf8b247f828a2b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 12:46:28 +0000 Subject: [PATCH 05/22] changeset --- .changeset/shaggy-cups-reply.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/shaggy-cups-reply.md diff --git a/.changeset/shaggy-cups-reply.md b/.changeset/shaggy-cups-reply.md new file mode 100644 index 000000000..9d744371e --- /dev/null +++ b/.changeset/shaggy-cups-reply.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': patch +--- + +Broadcast next steps with job-complete and error events From c8e9d513e259a5409136ee3abb42215062938028 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 14:58:33 +0000 Subject: [PATCH 06/22] engine: make sure next is included in job-complete --- .changeset/chilly-gorillas-poke.md | 5 +++++ packages/engine-multi/src/api/lifecycle.ts | 6 ++++-- packages/engine-multi/src/events.ts | 2 ++ packages/engine-multi/src/worker/events.ts | 2 ++ packages/engine-multi/test/integration.test.ts | 6 +++++- 5 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 .changeset/chilly-gorillas-poke.md diff --git a/.changeset/chilly-gorillas-poke.md b/.changeset/chilly-gorillas-poke.md new file mode 100644 index 000000000..91839e6e2 --- /dev/null +++ b/.changeset/chilly-gorillas-poke.md @@ -0,0 +1,5 @@ +--- +'@openfn/engine-multi': patch +--- + +Forward next from job complete diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index 50516ba37..e876a35a1 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -84,13 +84,14 @@ export const jobComplete = ( context: ExecutionContext, event: internalEvents.JobCompleteEvent ) => { - const { threadId, state, duration, jobId } = event; + const { threadId, state, duration, jobId, next } = event; context.emit(externalEvents.JOB_COMPLETE, { threadId, state, duration, jobId, + next, }); }; @@ -100,7 +101,7 @@ export const jobError = ( context: ExecutionContext, event: internalEvents.JobErrorEvent ) => { - const { threadId, state, error, duration, jobId } = event; + const { threadId, state, error, duration, jobId, next } = event; context.emit(externalEvents.JOB_ERROR, { threadId, @@ -108,6 +109,7 @@ export const jobError = ( error, duration, jobId, + next, }); }; diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index 417c0e3db..eca1cae54 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -70,6 +70,7 @@ export interface JobCompletePayload extends ExternalEvent { jobId: string; duration: number; state: any; // the result state + next: string[]; // downstream jobs } export interface JobErrorPayload extends ExternalEvent { @@ -77,6 +78,7 @@ export interface JobErrorPayload extends ExternalEvent { duration: number; state: any; // the result state error: any; + next: string[]; // downstream jobs } export interface WorkerLogPayload extends ExternalEvent, JSONLog {} diff --git a/packages/engine-multi/src/worker/events.ts b/packages/engine-multi/src/worker/events.ts index 12b0c3253..8ae5fb60c 100644 --- a/packages/engine-multi/src/worker/events.ts +++ b/packages/engine-multi/src/worker/events.ts @@ -44,6 +44,7 @@ export interface JobCompleteEvent extends InternalEvent { jobId: string; state: any; duration: number; + next: string[]; } export interface JobErrorEvent extends InternalEvent { @@ -51,6 +52,7 @@ export interface JobErrorEvent extends InternalEvent { state: any; error: any; // TODO this should be one of our errors duration: number; + next: string[]; } export interface LogEvent extends InternalEvent { diff --git a/packages/engine-multi/test/integration.test.ts b/packages/engine-multi/test/integration.test.ts index fc8f0286b..03e9e8b84 100644 --- a/packages/engine-multi/test/integration.test.ts +++ b/packages/engine-multi/test/integration.test.ts @@ -81,7 +81,11 @@ test.serial('trigger job-complete', (t) => { const plan = createPlan(); - api.execute(plan).on('job-complete', () => { + api.execute(plan).on('job-complete', (evt) => { + t.deepEqual(evt.next, []); + t.true(evt.duration < 10); + t.is(evt.jobId, 'j1'); + t.deepEqual(evt.state, { data: {} }); t.pass('job completed'); done(); }); From 3c56e2c2e2b2eb3cc90817549797bfe7d5be8a7c Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 15:05:36 +0000 Subject: [PATCH 07/22] engine: types --- packages/engine-multi/src/worker/mock-worker.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/engine-multi/src/worker/mock-worker.ts b/packages/engine-multi/src/worker/mock-worker.ts index f83d9396a..c55c88f72 100644 --- a/packages/engine-multi/src/worker/mock-worker.ts +++ b/packages/engine-multi/src/worker/mock-worker.ts @@ -64,6 +64,7 @@ function mock(plan: MockExecutionPlan) { jobId, duration: 100, state, + next: [], }); resolve(state); }, job._delay || 1); From 992e3427f812ffe8dd71dd856373753a635f8a83 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 15:43:26 +0000 Subject: [PATCH 08/22] worker: re-work input_dataclip_id --- packages/ws-worker/src/api/execute.ts | 59 +++++++++++++------ packages/ws-worker/src/events.ts | 7 ++- packages/ws-worker/src/mock/runtime-engine.ts | 20 ++++--- packages/ws-worker/test/api/execute.test.ts | 56 +++++++++++++++--- packages/ws-worker/test/integration.test.ts | 3 + 5 files changed, 110 insertions(+), 35 deletions(-) diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index ffcd6cee0..6a259062d 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -10,7 +10,7 @@ import { GET_CREDENTIAL, GET_DATACLIP, RUN_COMPLETE, - RUN_COMPLETE_PAYLOAD, + RunCompletePayload, RUN_START, RUN_START_PAYLOAD, } from '../events'; @@ -18,13 +18,14 @@ import { AttemptOptions, Channel, ExitReason } from '../types'; import { getWithReply, stringify } from '../util'; import type { JSONLog, Logger } from '@openfn/logger'; -import { +import type { + RuntimeEngine, + Resolvers, JobCompleteEvent, WorkflowCompleteEvent, WorkflowErrorEvent, WorkflowStartEvent, -} from '../mock/runtime-engine'; -import type { RuntimeEngine, Resolvers } from '@openfn/engine-multi'; +} from '@openfn/engine-multi'; import { ExecutionPlan } from '@openfn/runtime'; import { calculateAttemptExitReason, calculateJobExitReason } from './reasons'; @@ -36,6 +37,9 @@ export type AttemptState = { plan: ExecutionPlan; options: AttemptOptions; dataclips: Record; + // For each run, map the input ids + // TODO better name maybe? + inputDataclips: Record; reasons: Record; // final dataclip id @@ -61,15 +65,27 @@ const eventMap = { export const createAttemptState = ( plan: ExecutionPlan, options: AttemptOptions = {} -): AttemptState => ({ - plan, - // set the result data clip id (which needs renaming) - // to the initial state - lastDataclipId: plan.initialState as string | undefined, - dataclips: {}, - reasons: {}, - options, -}); +): AttemptState => { + const state = { + plan, + lastDataclipId: '', + dataclips: {}, + inputDataclips: {}, + reasons: {}, + options, + } as AttemptState; + + if (typeof plan.initialState === 'string') { + const startJobId = plan.start ?? plan.jobs[0].id; + state.inputDataclips[startJobId] = plan.initialState; + } else { + // what if initial state is an object? + // In practice I don't think this will happen, + // but the first input_state_id will be messed up + } + + return state; +}; // pass a web socket connected to the attempt channel // this thing will do all the work @@ -179,10 +195,13 @@ export function onJobStart({ channel, state }: Context, event: any) { // generate a run id and write it to state state.activeRun = crypto.randomUUID(); state.activeJob = event.jobId; + + const input_dataclip_id = state.inputDataclips[event.jobId]; + return sendEvent(channel, RUN_START, { run_id: state.activeRun!, job_id: state.activeJob!, - input_dataclip_id: state.lastDataclipId, + input_dataclip_id, }); } @@ -235,6 +254,11 @@ export function onJobComplete( // so we have a bit of a mapping problem state.lastDataclipId = dataclipId; + // Set the input dataclip id for downstream jobs + event.next?.forEach((nextJobId) => { + state.inputDataclips[nextJobId] = dataclipId; + }); + delete state.activeRun; delete state.activeJob; try { @@ -245,7 +269,7 @@ export function onJobComplete( ); state.reasons[job_id] = { reason, error_message, error_type }; - return sendEvent(channel, RUN_COMPLETE, { + return sendEvent(channel, RUN_COMPLETE, { run_id, job_id, output_dataclip_id: dataclipId, @@ -267,13 +291,12 @@ export function onWorkflowStart( return sendEvent(channel, ATTEMPT_START); } -// TODO what this needs to do is look at all the job states -// find the higher priority -// And return that as the highest exit reason export async function onWorkflowComplete( { state, channel, onComplete }: Context, _event: WorkflowCompleteEvent ) { + // TODO I dont think the attempt final dataclip IS the last job dataclip + // Especially not in parallelisation const result = state.dataclips[state.lastDataclipId!]; const reason = calculateAttemptExitReason(state); await sendEvent(channel, ATTEMPT_COMPLETE, { diff --git a/packages/ws-worker/src/events.ts b/packages/ws-worker/src/events.ts index 5ed9e3665..b01fa61e2 100644 --- a/packages/ws-worker/src/events.ts +++ b/packages/ws-worker/src/events.ts @@ -50,16 +50,17 @@ export type RUN_START_PAYLOAD = { job_id: string; run_id: string; attempt_id?: string; - input_dataclip_id?: string; //hmm + input_dataclip_id?: string; }; export type RUN_START_REPLY = void; export const RUN_COMPLETE = 'run:complete'; -export type RUN_COMPLETE_PAYLOAD = ExitReason & { +export type RunCompletePayload = ExitReason & { attempt_id?: string; job_id: string; run_id: string; output_dataclip?: string; output_dataclip_id?: string; + next_job_ids: string[]; }; -export type RUN_COMPLETE_REPLY = void; +export type RunCompleteReply = void; diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 0525bb96e..9738cfd98 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -19,12 +19,12 @@ export type JobStartEvent = { runId: string; // run id. Not sure we need this. }; -export type JobCompleteEvent = { - workflowId: string; - jobId: string; - state: State; // do we really want to publish the intermediate events? Could be important, but also could be sensitive - // I suppose at this level yes, we should publish it -}; +// export type JobCompleteEvent = { +// workflowId: string; +// jobId: string; +// state: State; // do we really want to publish the intermediate events? Could be important, but also could be sensitive +// // I suppose at this level yes, we should publish it +// }; export type WorkflowStartEvent = { workflowId: string; @@ -127,7 +127,13 @@ async function createMock() { } } - dispatch('job-complete', { workflowId, jobId, state: nextState, runId }); + dispatch('job-complete', { + workflowId, + jobId, + state: nextState, + runId, + next: [], // TODO hmm. I think we need to do better than this. + }); return nextState; }; diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index afd5df872..bfaac3289 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -45,6 +45,24 @@ const mockEventHandlers = { // This is a nonsense timestamp but it's fine for the test (and easy to convert) const getBigIntTimestamp = () => (BigInt(Date.now()) * BigInt(1e6)).toString(); +test('createAttemptState: set initial input dataclip for job[0]', (t) => { + const plan = { initialState: 'x', jobs: [{ id: 'a' }] }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 'x' }); +}); + +test('createAttemptState: set initial input dataclip for attempt.start', (t) => { + const plan = { + initialState: 'x', + start: 'a', + jobs: [{ id: 'b' }, { id: 'a' }], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 'x' }); +}); + test('send event should resolve when the event is acknowledged', async (t) => { const channel = mockChannel({ echo: (x) => x, @@ -70,9 +88,7 @@ test('jobStart should set a run id and active job on state', async (t) => { const plan = { id: 'attempt-1' }; const jobId = 'job-1'; - const state = { - plan, - } as AttemptState; + const state = createAttemptState(plan); const channel = mockChannel({ [RUN_START]: (x) => x, @@ -85,18 +101,21 @@ test('jobStart should set a run id and active job on state', async (t) => { }); test('jobStart should send a run:start event', async (t) => { - const plan = { id: 'attempt-1' }; + const plan = { + id: 'attempt-1', + initialState: 'abc', + jobs: [{ id: 'job-1' }, { id: 'job-2' }], + }; const jobId = 'job-1'; const state = createAttemptState(plan); state.activeJob = jobId; state.activeRun = 'b'; - state.lastDataclipId = 'abc'; // this will be set to initial state by execute const channel = mockChannel({ [RUN_START]: (evt) => { t.is(evt.job_id, jobId); - t.is(evt.input_dataclip_id, state.lastDataclipId); + t.is(evt.input_dataclip_id, plan.initialState); t.truthy(evt.run_id); return true; }, @@ -124,6 +143,29 @@ test('jobComplete should clear the run id and active job on state', async (t) => t.falsy(state.activeRun); }); +test('jobComplete should setup input mappings on on state', async (t) => { + let lightningEvent; + const plan = { id: 'attempt-1' }; + const jobId = 'job-1'; + + const state = createAttemptState(plan); + state.activeJob = jobId; + state.activeRun = 'b'; + + const channel = mockChannel({ + [RUN_COMPLETE]: (evt) => { + lightningEvent = evt; + }, + }); + + const engineEvent = { state: { x: 10 }, next: ['job-2'] }; + await onJobComplete({ channel, state }, engineEvent); + + t.deepEqual(state.inputDataclips, { + ['job-2']: lightningEvent.output_dataclip_id, + }); +}); + test('jobComplete should save the dataclip to state', async (t) => { const plan = { id: 'attempt-1' } as ExecutionPlan; const jobId = 'job-1'; @@ -211,7 +253,7 @@ test('jobComplete should send a run:complete event', async (t) => { }, }); - const event = { state: result }; + const event = { state: result, next: ['a'] }; await onJobComplete({ channel, state }, event); }); diff --git a/packages/ws-worker/test/integration.test.ts b/packages/ws-worker/test/integration.test.ts index 6072abb9a..f90f0d5c0 100644 --- a/packages/ws-worker/test/integration.test.ts +++ b/packages/ws-worker/test/integration.test.ts @@ -443,4 +443,7 @@ test('should not claim while at capacity', async (t) => { }); }); +// hmm, i don't even think I can test this in the mock runtime +test.skip('should pass the right dataclip when running in parallel', () => {}); + test.todo(`should run multiple attempts`); From 84423a437ee31946c9ccf72ec8827f549cfd32b1 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 16:08:09 +0000 Subject: [PATCH 09/22] worker:typings and test rename --- packages/ws-worker/src/api/execute.ts | 23 +++++++++++-------- packages/ws-worker/src/mock/runtime-engine.ts | 16 +------------ ...{integration.test.ts => lightning.test.ts} | 0 3 files changed, 15 insertions(+), 24 deletions(-) rename packages/ws-worker/test/{integration.test.ts => lightning.test.ts} (100%) diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 6a259062d..b21374bf1 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -21,10 +21,10 @@ import type { JSONLog, Logger } from '@openfn/logger'; import type { RuntimeEngine, Resolvers, - JobCompleteEvent, - WorkflowCompleteEvent, - WorkflowErrorEvent, - WorkflowStartEvent, + JobCompletePayload, + WorkflowCompletePayload, + WorkflowErrorPayload, + WorkflowStartPayload, } from '@openfn/engine-multi'; import { ExecutionPlan } from '@openfn/runtime'; import { calculateAttemptExitReason, calculateJobExitReason } from './reasons'; @@ -77,6 +77,7 @@ export const createAttemptState = ( if (typeof plan.initialState === 'string') { const startJobId = plan.start ?? plan.jobs[0].id; + // @ts-ignore state.inputDataclips[startJobId] = plan.initialState; } else { // what if initial state is an object? @@ -171,7 +172,11 @@ export function execute( engine.execute(plan, { resolvers, ...options }); } catch (e: any) { // TODO what if there's an error? - onWorkflowError(context, { workflowId: plan.id!, message: e.message }); + onWorkflowError(context, { + workflowId: plan.id!, + message: e.message, + type: e.type, + }); } }); @@ -232,7 +237,7 @@ export function onJobError(context: Context, event: any) { // b) save the reason for each job to state for later export function onJobComplete( { channel, state }: Context, - event: JobCompleteEvent, + event: JobCompletePayload, // TODO this isn't terribly graceful, but accept an error for crashes error?: any ) { @@ -286,14 +291,14 @@ export function onJobComplete( export function onWorkflowStart( { channel }: Context, - _event: WorkflowStartEvent + _event: WorkflowStartPayload ) { return sendEvent(channel, ATTEMPT_START); } export async function onWorkflowComplete( { state, channel, onComplete }: Context, - _event: WorkflowCompleteEvent + _event: WorkflowCompletePayload ) { // TODO I dont think the attempt final dataclip IS the last job dataclip // Especially not in parallelisation @@ -311,7 +316,7 @@ export async function onWorkflowComplete( // NB this is a crash state! export async function onWorkflowError( { state, channel, onComplete }: Context, - event: WorkflowErrorEvent + event: WorkflowErrorPayload ) { // Should we not just report this reason? // Nothing more severe can have happened downstream, right? diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 9738cfd98..9e4380294 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -2,7 +2,6 @@ import crypto from 'node:crypto'; import { EventEmitter } from 'node:events'; import type { ExecutionPlan, JobNode } from '@openfn/runtime'; import * as engine from '@openfn/engine-multi'; -import type { State } from '../types'; import mockResolvers from './resolvers'; export type EngineEvent = @@ -13,19 +12,6 @@ export type EngineEvent = | typeof engine.WORKFLOW_LOG | typeof engine.WORKFLOW_START; -export type JobStartEvent = { - workflowId: string; - jobId: string; - runId: string; // run id. Not sure we need this. -}; - -// export type JobCompleteEvent = { -// workflowId: string; -// jobId: string; -// state: State; // do we really want to publish the intermediate events? Could be important, but also could be sensitive -// // I suppose at this level yes, we should publish it -// }; - export type WorkflowStartEvent = { workflowId: string; }; @@ -62,7 +48,7 @@ async function createMock() { // TODO: Listeners will be removed when the plan is complete (?) const listen = ( planId: string, - events: Record void> + events: Record void> ) => { listeners[planId] = events; }; diff --git a/packages/ws-worker/test/integration.test.ts b/packages/ws-worker/test/lightning.test.ts similarity index 100% rename from packages/ws-worker/test/integration.test.ts rename to packages/ws-worker/test/lightning.test.ts From f96a56d72b2b77d04c139d33954a93fd5df742a6 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 16:56:27 +0000 Subject: [PATCH 10/22] tests: add tests for parallel workflow --- integration-tests/worker/package.json | 3 +- integration-tests/worker/src/factories.ts | 24 ++++ integration-tests/worker/src/init.ts | 4 +- .../worker/test/attempts.test.ts | 127 ++++++++++++++++++ .../worker/test/exit-reasons.test.ts | 2 +- 5 files changed, 156 insertions(+), 4 deletions(-) create mode 100644 integration-tests/worker/src/factories.ts create mode 100644 integration-tests/worker/test/attempts.test.ts diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 754a523f4..1b0446502 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -11,7 +11,8 @@ "build:pack": "pnpm clean && cd ../.. && pnpm pack:local integration-tests/worker/dist --no-version", "build": "pnpm build:pack && docker build --tag worker-integration-tests .", "start": "docker run worker-integration-tests", - "test": "pnpm clean && npx ava -s --timeout 2m" + "test": "pnpm clean && npx ava -s --timeout 2m", + "test:cache": "npx ava -s --timeout 2m" }, "dependencies": { "@openfn/engine-multi": "workspace:^", diff --git a/integration-tests/worker/src/factories.ts b/integration-tests/worker/src/factories.ts new file mode 100644 index 000000000..f39729275 --- /dev/null +++ b/integration-tests/worker/src/factories.ts @@ -0,0 +1,24 @@ +import crypto from 'node:crypto'; + +export const createAttempt = (jobs, edges, args) => ({ + id: crypto.randomUUID(), + jobs, + edges, + ...args, +}); + +export const createJob = (args) => ({ + id: crypto.randomUUID(), + adaptor: '@openfn/language-common@latest', + expression: 'fn((s) => s)', + ...args, +}); + +export const createEdge = (a: any, b: any, condition?: string) => ({ + id: crypto.randomUUID(), + source_job_id: a.id, + target_job_id: b.id, + // condition, +}); + +export default createAttempt; diff --git a/integration-tests/worker/src/init.ts b/integration-tests/worker/src/init.ts index 19777be5d..2c57c2e61 100644 --- a/integration-tests/worker/src/init.ts +++ b/integration-tests/worker/src/init.ts @@ -4,7 +4,7 @@ import crypto from 'node:crypto'; import createLightningServer from '@openfn/lightning-mock'; import createEngine from '@openfn/engine-multi'; import createWorkerServer from '@openfn/ws-worker'; -import { createMockLogger } from '@openfn/logger'; +import createLogger, { createMockLogger } from '@openfn/logger'; export const randomPort = () => parseInt(2000 + Math.random() * 1000); @@ -18,7 +18,7 @@ export const initWorker = async (lightningPort, engineArgs = {}) => { const workerPort = randomPort(); const engine = await createEngine({ - // logger: createLogger('engine', { level: 'debug' }), + logger: createLogger('engine', { level: 'debug' }), logger: createMockLogger(), repoDir: path.resolve('./tmp/repo/default'), ...engineArgs, diff --git a/integration-tests/worker/test/attempts.test.ts b/integration-tests/worker/test/attempts.test.ts new file mode 100644 index 000000000..a8c35a599 --- /dev/null +++ b/integration-tests/worker/test/attempts.test.ts @@ -0,0 +1,127 @@ +import test from 'ava'; +import path from 'node:path'; + +import { createAttempt, createEdge, createJob } from '../src/factories'; +import { initLightning, initWorker } from '../src/init'; + +let lightning; +let worker; + +test.before(async () => { + const lightningPort = 4321; + + lightning = initLightning(lightningPort); + + ({ worker } = await initWorker(lightningPort, { + repoDir: path.resolve('tmp/openfn/repo/attempts'), + })); +}); + +test.after(async () => { + lightning.destroy(); + await worker.destroy(); +}); + +const run = async (attempt) => { + return new Promise(async (done, reject) => { + lightning.once('attempt:complete', (evt) => { + if (attempt.id === evt.attemptId) { + done(lightning.getResult(attempt.id)); + } else { + // If we get here, something has gone very wrong + reject('attempt not found'); + } + }); + + lightning.enqueueAttempt(attempt); + }); +}; + +test('echo initial state', async (t) => { + const initialState = { data: { count: 22 } }; + + lightning.addDataclip('s1', initialState); + + const job = createJob({ body: 'fn((s) => s)' }); + const attempt = createAttempt([job], [], { + dataclip_id: 's1', + }); + + const result = await run(attempt); + + t.deepEqual(result, { + data: { + count: 22, + }, + }); +}); + +// hmm this event feels a bit fine-grained for this +// This file should just be about input-output +// TODO maybe move it into integrations later +test('run parallel jobs', async (t) => { + const initialState = { data: { count: 22 } }; + + lightning.addDataclip('s1', initialState); + + /* + [a] + / \ + [x] [y] + */ + const a = createJob({ body: 'fn((s) => ({ data: { a: true }}))' }); + const x = createJob({ body: 'fn((s) => { s.data.x = true; return s; })' }); + const y = createJob({ body: 'fn((s) => { s.data.y = true; return s; })' }); + const ax = createEdge(a, x); + const ay = createEdge(a, y); + const jobs = [a, x, y]; + const edges = [ax, ay]; + + const attempt = createAttempt(jobs, edges, { + dataclip_id: 's1', + }); + + // This saves JSON returned by a job + const outputJson = {}; + + // This saves the dataclip returned by a job + const outputId = {}; + + lightning.on('run:start', (evt) => { + // x and y should both be passed the dataclip produced by job a + if (evt.payload.run_id === x.id || evt.payload.run_id === y.id) { + evt.payload.input_dataclip_id = outputId[a.id]; + } + }); + + lightning.on('run:complete', (evt) => { + // save the output dataclip + outputJson[evt.payload.job_id] = evt.payload.output_dataclip_id; + outputJson[evt.payload.job_id] = JSON.parse(evt.payload.output_dataclip); + }); + + const result = await run(attempt); + + t.deepEqual(outputJson[x.id].data, { + a: true, + x: true, + // Should not include a write from y + }); + t.deepEqual(outputJson[y.id].data, { + a: true, + y: true, + // Should not include a write from x + }); + + // I think the result should look like this - but it won't without work + // t.deepEqual(result, { + // [x.id]: { + // a: true, + // x: true, + // }, + // [y.id]: { + // a: true, + // y: true, + // }, + // }); +}); diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index 6be55871e..550b1ac99 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -24,7 +24,7 @@ test.after(async () => { const run = async (attempt) => { return new Promise(async (done) => { - lightning.on('attempt:complete', (evt) => { + lightning.once('attempt:complete', (evt) => { if (attempt.id === evt.attemptId) { done(evt.payload); } From f6bd41fe0a027cb3c7db13dcc4c0cecaf5f2b8d4 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 16:57:15 +0000 Subject: [PATCH 11/22] engine: remove log line --- packages/engine-multi/src/api/lifecycle.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index e876a35a1..dfaf8aacb 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -45,7 +45,7 @@ export const workflowComplete = ( const { workflowId, state: result, threadId } = event; logger.success('complete workflow ', workflowId); - logger.info(state); + //logger.info(event.state); // TODO I don't know how we'd get here in this architecture // if (!allWorkflows.has(workflowId)) { From 334098e1c58ccedc7bb11d37ad448297444e448f Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 14 Nov 2023 16:57:31 +0000 Subject: [PATCH 12/22] package lock --- pnpm-lock.yaml | 510 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 499 insertions(+), 11 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3c352c711..7940cf52d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -146,6 +146,27 @@ importers: specifier: ^5.1.6 version: 5.1.6 + integration-tests/worker/tmp/openfn/repo/attempts: + dependencies: + '@openfn/language-common_latest': + specifier: npm:@openfn/language-common@^1.11.1 + version: /@openfn/language-common@1.11.1 + + integration-tests/worker/tmp/openfn/repo/exit-reason: + dependencies: + '@openfn/language-common_latest': + specifier: npm:@openfn/language-common@^1.11.1 + version: /@openfn/language-common@1.11.1 + + integration-tests/worker/tmp/openfn/repo/integration: + dependencies: + '@openfn/language-common_latest': + specifier: npm:@openfn/language-common@^1.11.1 + version: /@openfn/language-common@1.11.1 + '@openfn/language-http_5.0.4': + specifier: npm:@openfn/language-http@^5.0.4 + version: /@openfn/language-http@5.0.4 + packages/cli: dependencies: '@inquirer/prompts': @@ -1330,6 +1351,11 @@ packages: heap: 0.2.7 dev: false + /@fastify/busboy@2.1.0: + resolution: {integrity: sha512-+KpH+QxZU7O4675t3mnkQKcZZg56u+K/Ct2K+N2AZYNVK8kyeo/bI18tI8aPm3tvNNRyTWfj6s5tnGNlcbQRsA==} + engines: {node: '>=14'} + dev: false + /@inquirer/checkbox@1.3.5: resolution: {integrity: sha512-ZznkPU+8XgNICKkqaoYENa0vTw9jeToEHYyG5gUKpGmY+4PqPTsvLpSisOt9sukLkYzPRkpSCHREgJLqbCG3Fw==} engines: {node: '>=14.18.0'} @@ -1595,6 +1621,21 @@ packages: semver: 7.5.4 dev: true + /@openfn/language-common@1.11.1: + resolution: {integrity: sha512-pyi2QymdF9NmUYJX/Bsv5oBy7TvzICfKcnCqutq412HYq2KTGKDO2dMWloDrxrH1kuzG+4XkSn0ZUom36b3KAA==} + dependencies: + ajv: 8.12.0 + axios: 1.1.3 + csv-parse: 5.5.2 + csvtojson: 2.0.10 + date-fns: 2.30.0 + jsonpath-plus: 4.0.0 + lodash: 4.17.21 + undici: 5.27.2 + transitivePeerDependencies: + - debug + dev: false + /@openfn/language-common@1.7.5: resolution: {integrity: sha512-QivV3v5Oq5fb4QMopzyqUUh+UGHaFXBdsGr6RCmu6bFnGXdJdcQ7GpGpW5hKNq29CkmE23L/qAna1OLr4rP/0w==} dependencies: @@ -1610,6 +1651,22 @@ packages: resolution: {integrity: sha512-7kwhBnCd1idyTB3MD9dXmUqROAhoaUIkz2AGDKuv9vn/cbZh7egEv9/PzKkRcDJYFV9qyyS+cVT3Xbgsg2ii5g==} bundledDependencies: [] + /@openfn/language-http@5.0.4: + resolution: {integrity: sha512-zuMlJyORxBps0KO+93a3kVBRzStGwYVNAOEl7GgvO6Z96YBO7/K2NiqSyMHTyQeIyCLUBGmrEv1AAuk4pXxKAg==} + dependencies: + '@openfn/language-common': 1.11.1 + cheerio: 1.0.0-rc.12 + cheerio-tableparser: 1.0.1 + csv-parse: 4.16.3 + fast-safe-stringify: 2.1.1 + form-data: 3.0.1 + lodash: 4.17.21 + request: 2.88.2 + tough-cookie: 4.1.3 + transitivePeerDependencies: + - debug + dev: false + /@pkgjs/parseargs@0.11.0: resolution: {integrity: sha512-+1VkjdD0QBLPodGrJUeqarH8VAIvQODIbwh9XpP5Syisf7YoQgsJKPNFoqqLQlu+VQ/tVSshMR6loPMn8U+dPg==} engines: {node: '>=14'} @@ -2056,6 +2113,24 @@ packages: clean-stack: 4.2.0 indent-string: 5.0.0 + /ajv@6.12.6: + resolution: {integrity: sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==} + dependencies: + fast-deep-equal: 3.1.3 + fast-json-stable-stringify: 2.1.0 + json-schema-traverse: 0.4.1 + uri-js: 4.4.1 + dev: false + + /ajv@8.12.0: + resolution: {integrity: sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==} + dependencies: + fast-deep-equal: 3.1.3 + json-schema-traverse: 1.0.0 + require-from-string: 2.0.2 + uri-js: 4.4.1 + dev: false + /ansi-colors@4.1.3: resolution: {integrity: sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==} engines: {node: '>=6'} @@ -2191,6 +2266,17 @@ packages: resolution: {integrity: sha512-tLkvA81vQG/XqE2mjDkGQHoOINtMHtysSnemrmoGe6PydDPMRbVugqyk4A6V/WDWEfm3l+0d8anA9r8cv/5Jaw==} engines: {node: '>=12'} + /asn1@0.2.6: + resolution: {integrity: sha512-ix/FxPn0MDjeyJ7i/yoHGFt/EX6LyNbxSEhPPXODPL+KB0VPk86UYfL0lMdy+KCnv+fmvIzySwaK5COwqVbWTQ==} + dependencies: + safer-buffer: 2.1.2 + dev: false + + /assert-plus@1.0.0: + resolution: {integrity: sha512-NfJ4UzBCcQGLDlQq7nHxH+tv3kyZ0hHQqF5BO6J7tNJeP5do1llPr8dZ8zHonfhAu0PHAdMkSo+8o0wxg9lZWw==} + engines: {node: '>=0.8'} + dev: false + /assign-symbols@1.0.0: resolution: {integrity: sha512-Q+JC7Whu8HhmTdBph/Tq59IoRtoy6KAm5zzPv00WdujX82lbAL8K7WVjne7vdCsAmbF4AYaDOPyO3k0kl8qIrw==} engines: {node: '>=0.10.0'} @@ -2216,7 +2302,6 @@ packages: /asynckit@0.4.0: resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} - dev: true /atob@2.1.2: resolution: {integrity: sha512-Wm6ukoaOGJi/73p/cl2GvLjTI5JM1k/O14isD73YML8StrH/7/lRFgmg8nICZgD3bZZvjwCGxtMOD3wWNAu8cg==} @@ -2347,6 +2432,14 @@ packages: fast-glob: 3.3.1 dev: true + /aws-sign2@0.7.0: + resolution: {integrity: sha512-08kcGqnYf/YmjoRhfxyu+CLxBjUtHLXLXX/vUfx9l2LYzG3c1m61nrpyFUZI6zeS+Li/wWMMidD9KgrqtGq3mA==} + dev: false + + /aws4@1.12.0: + resolution: {integrity: sha512-NmWvPnx0F1SfrQbYwOi7OeaNGokp9XhzNioJ/CSBs8Qa4vxug81mhJEAVZwxXuBmYB5KDRfMq/F3RR0BIU7sWg==} + dev: false + /axios@0.27.2: resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} dependencies: @@ -2364,7 +2457,6 @@ packages: proxy-from-env: 1.1.0 transitivePeerDependencies: - debug - dev: true /b4a@1.6.1: resolution: {integrity: sha512-AsKjNhz72yxteo/0EtQEiwkMUgk/tGmycXlbG4g3Ard2/ULtNLUykGOkeK0egmN27h0xMAhb76jYccW+XTBExA==} @@ -2401,6 +2493,12 @@ packages: resolution: {integrity: sha512-x+VAiMRL6UPkx+kudNvxTl6hB2XNNCG2r+7wixVfIYwu/2HKRXimwQyaumLjMveWvT2Hkd/cAJw+QBMfJ/EKVw==} dev: true + /bcrypt-pbkdf@1.0.2: + resolution: {integrity: sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w==} + dependencies: + tweetnacl: 0.14.5 + dev: false + /bcryptjs@2.4.3: resolution: {integrity: sha512-V/Hy/X9Vt7f3BbPJEi8BdVFMByHi+jNXrYkW3huaybV/kQ0KJg0Y6PkEMbn+zeT+i+SiKZ/HMqJGIIt4LZDqNQ==} dev: true @@ -2445,9 +2543,17 @@ packages: readable-stream: 4.2.0 dev: true + /bluebird@3.7.2: + resolution: {integrity: sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==} + dev: false + /blueimp-md5@2.19.0: resolution: {integrity: sha512-DRQrD6gJyy8FbiE4s+bDoXS9hiW3Vbx5uCdwvcCf3zLHL+Iv7LtGHLpr+GZV8rHG8tK766FGYBwRbu8pELTt+w==} + /boolbase@1.0.0: + resolution: {integrity: sha512-JZOSA7Mo9sNGB8+UjSgzdLtokWAky1zbztM3WRLCbZ70/3cTANmQmOdR7y2g+J0e2WXywy1yS468tY+IruqEww==} + dev: false + /brace-expansion@1.1.11: resolution: {integrity: sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==} dependencies: @@ -2632,6 +2738,10 @@ packages: engines: {node: '>=6'} dev: true + /caseless@0.12.0: + resolution: {integrity: sha512-4tYFyifaFfGacoiObjJegolkwSU4xQNGbVgUiNYVUxbQ2x2lUsFvY4hVgVzGiIe6WLOPqycWXA40l+PWsxthUw==} + dev: false + /cbor@8.1.0: resolution: {integrity: sha512-DwGjNW9omn6EwP70aXsn7FQJx5kO12tX0bZkaTjzdVFM6/7nhA4t0EENocKGx6D2Bch9PE2KzCUf5SceBdeijg==} engines: {node: '>=12.19'} @@ -2665,6 +2775,34 @@ packages: /chardet@0.7.0: resolution: {integrity: sha512-mT8iDcrh03qDGRRmoA2hmBJnxpllMR+0/0qlzjqZES6NdiWDcZkCNAk4rPFZ9Q85r27unkiNNg8ZOiwZXBHwcA==} + /cheerio-select@2.1.0: + resolution: {integrity: sha512-9v9kG0LvzrlcungtnJtpGNxY+fzECQKhK4EGJX2vByejiMX84MFNQw4UxPJl3bFbTMw+Dfs37XaIkCwTZfLh4g==} + dependencies: + boolbase: 1.0.0 + css-select: 5.1.0 + css-what: 6.1.0 + domelementtype: 2.3.0 + domhandler: 5.0.3 + domutils: 3.1.0 + dev: false + + /cheerio-tableparser@1.0.1: + resolution: {integrity: sha512-SCSWdMoFvIue0jdFZqRNPXDCZ67vuirJEG3pfh3AAU2hwxe/qh1EQUkUNPWlZhd6DMjRlTfcpcPWbaowjwRnNQ==} + dev: false + + /cheerio@1.0.0-rc.12: + resolution: {integrity: sha512-VqR8m68vM46BNnuZ5NtnGBKIE/DfN0cRIzg9n40EIq9NOv90ayxLBXA8fXC5gquFRGJSTRqBq25Jt2ECLR431Q==} + engines: {node: '>= 6'} + dependencies: + cheerio-select: 2.1.0 + dom-serializer: 2.0.0 + domhandler: 5.0.3 + domutils: 3.1.0 + htmlparser2: 8.0.2 + parse5: 7.1.2 + parse5-htmlparser2-tree-adapter: 7.0.0 + dev: false + /chokidar@2.1.8: resolution: {integrity: sha512-ZmZUazfOzf0Nve7duiCKD23PFSCs4JPoYyccjUFF3aQkQadqBhfzhjkwBH2mNOG9cTBwhamM37EIsIkZw3nRgg==} deprecated: Chokidar 2 does not receive security updates since 2019. Upgrade to chokidar 3 with 15x fewer dependencies @@ -2841,7 +2979,6 @@ packages: engines: {node: '>= 0.8'} dependencies: delayed-stream: 1.0.0 - dev: true /commander@4.1.1: resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==} @@ -2916,6 +3053,10 @@ packages: resolution: {integrity: sha512-3DdaFaU/Zf1AnpLiFDeNCD4TOWe3Zl2RZaTzUvWiIk5ERzcCodOE20Vqq4fzCbNoHURFHT4/us/Lfq+S2zyY4w==} dev: false + /core-util-is@1.0.2: + resolution: {integrity: sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==} + dev: false + /core-util-is@1.0.3: resolution: {integrity: sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==} dev: true @@ -2966,6 +3107,21 @@ packages: which: 2.0.2 dev: true + /css-select@5.1.0: + resolution: {integrity: sha512-nwoRF1rvRRnnCqqY7updORDsuqKzqYJ28+oSMaJMMgOauh3fvwHqMS7EZpIPqK8GL+g9mKxF1vP/ZjSeNjEVHg==} + dependencies: + boolbase: 1.0.0 + css-what: 6.1.0 + domhandler: 5.0.3 + domutils: 3.1.0 + nth-check: 2.1.1 + dev: false + + /css-what@6.1.0: + resolution: {integrity: sha512-HTUrgRJ7r4dsZKU6GjmpfRK1O76h97Z8MfS1G0FozR+oF2kG6Vfe8JE6zwrkbxigziPHinCJ+gCPjA9EaBDtRw==} + engines: {node: '>= 6'} + dev: false + /cssesc@3.0.0: resolution: {integrity: sha512-/Tb/JcjK111nNScGob5MNtsntNM1aCNUDipB/TkwZFhyDrrE47SOx/18wF2bbjgc3ZzCSKW1T5nt5EbFoAz/Vg==} engines: {node: '>=4'} @@ -2982,7 +3138,10 @@ packages: /csv-parse@4.16.3: resolution: {integrity: sha512-cO1I/zmz4w2dcKHVvpCr7JVRu8/FymG5OEpmvsZYlccYolPBLoVGKUHgNoc4ZGkFeFlWGEDmMyBM+TTqRdW/wg==} - dev: true + + /csv-parse@5.5.2: + resolution: {integrity: sha512-YRVtvdtUNXZCMyK5zd5Wty1W6dNTpGKdqQd4EQ8tl/c6KW1aMBB1Kg1ppky5FONKmEqGJ/8WjLlTNLPne4ioVA==} + dev: false /csv-stringify@5.6.5: resolution: {integrity: sha512-PjiQ659aQ+fUTQqSrd1XEDnOr52jh30RBurfzkscaE2tPaFsDH5wOAHJiw8XAHphRknCwMUE9KRayc4K/NbO8A==} @@ -2998,12 +3157,29 @@ packages: stream-transform: 2.1.3 dev: true + /csvtojson@2.0.10: + resolution: {integrity: sha512-lUWFxGKyhraKCW8Qghz6Z0f2l/PqB1W3AO0HKJzGIQ5JRSlR651ekJDiGJbBT4sRNNv5ddnSGVEnsxP9XRCVpQ==} + engines: {node: '>=4.0.0'} + hasBin: true + dependencies: + bluebird: 3.7.2 + lodash: 4.17.21 + strip-bom: 2.0.0 + dev: false + /currently-unhandled@0.4.1: resolution: {integrity: sha512-/fITjgjGU50vjQ4FH6eUoYu+iUoUKIXws2hL15JJpIR+BbTxaXQsMuuyjtNh2WqsSBS5nsaZHFsFecyw5CCAng==} engines: {node: '>=0.10.0'} dependencies: array-find-index: 1.0.2 + /dashdash@1.14.1: + resolution: {integrity: sha512-jRFi8UDGo6j+odZiEpjazZaWqEal3w/basFjQHQEwVtZJGDpxbH1MeYluwCS8Xq5wmLJooDlMgvVarmWfGM44g==} + engines: {node: '>=0.10'} + dependencies: + assert-plus: 1.0.0 + dev: false + /date-fns@2.30.0: resolution: {integrity: sha512-fnULvOpxnC5/Vg3NCiWelDsLiUc9bRwAPs/+LfTLNvetFCtCTN+yQz15C/fs4AwX1R9K5GLtLfn8QW+dWisaAw==} engines: {node: '>=0.11'} @@ -3138,7 +3314,6 @@ packages: /delayed-stream@1.0.0: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} - dev: true /delegates@1.0.0: resolution: {integrity: sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==} @@ -3189,6 +3364,33 @@ packages: resolution: {integrity: sha512-+HlytyjlPKnIG8XuRG8WvmBP8xs8P71y+SKKS6ZXWoEgLuePxtDoUEiH7WkdePWrQ5JBpE6aoVqfZfJUQkjXwA==} dev: true + /dom-serializer@2.0.0: + resolution: {integrity: sha512-wIkAryiqt/nV5EQKqQpo3SToSOV9J0DnbJqwK7Wv/Trc92zIAYZ4FlMu+JPFW1DfGFt81ZTCGgDEabffXeLyJg==} + dependencies: + domelementtype: 2.3.0 + domhandler: 5.0.3 + entities: 4.5.0 + dev: false + + /domelementtype@2.3.0: + resolution: {integrity: sha512-OLETBj6w0OsagBwdXnPdN0cnMfF9opN69co+7ZrbfPGrdpPVNBUj02spi6B1N7wChLQiPn4CSH/zJvXw56gmHw==} + dev: false + + /domhandler@5.0.3: + resolution: {integrity: sha512-cgwlv/1iFQiFnU96XXgROh8xTeetsnJiDsTc7TYCLFd9+/WNkIqPTxiM/8pSd8VIrhXGTf1Ny1q1hquVqDJB5w==} + engines: {node: '>= 4'} + dependencies: + domelementtype: 2.3.0 + dev: false + + /domutils@3.1.0: + resolution: {integrity: sha512-H78uMmQtI2AhgDJjWeQmHwJJ2bLPD3GMmO7Zja/ZZh84wkm+4ut+IUnUdRa8uCGX88DiVx1j6FRe1XfxEgjEZA==} + dependencies: + dom-serializer: 2.0.0 + domelementtype: 2.3.0 + domhandler: 5.0.3 + dev: false + /dreamopt@0.8.0: resolution: {integrity: sha512-vyJTp8+mC+G+5dfgsY+r3ckxlz+QMX40VjPQsZc5gxVAxLmi64TBoVkP54A/pRAXMXsbu2GMMBrZPxNv23waMg==} engines: {node: '>=0.4.0'} @@ -3212,6 +3414,13 @@ packages: /eastasianwidth@0.2.0: resolution: {integrity: sha512-I88TYZWc9XiYHRQ4/3c5rjjfgkjhLyW2luGIheGERbNQ6OY7yTybanSpDXZa8y7VUP9YmDcYa+eyq4ca7iLqWA==} + /ecc-jsbn@0.1.2: + resolution: {integrity: sha512-eh9O+hwRHNbG4BLTjEl3nw044CkGm5X6LoaCf7LPp7UU8Qrt47JYNi6nPX8xjW97TKGKm1ouctg0QSpZe9qrnw==} + dependencies: + jsbn: 0.1.1 + safer-buffer: 2.1.2 + dev: false + /ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} @@ -3250,6 +3459,11 @@ packages: ansi-colors: 4.1.3 dev: true + /entities@4.5.0: + resolution: {integrity: sha512-V0hjH4dGPh9Ao5p0MoRY6BVqtwCjhz6vI5LT8AJ55H+4g9/4vbHx1I54fS0XuclLhDHArPQCiMjDxjaL8fPxhw==} + engines: {node: '>=0.12'} + dev: false + /err-code@2.0.3: resolution: {integrity: sha512-2bmlRpNKBxT/CRmPOlyISQpNj+qSeYvcym/uT0Jx2bMOlKLtSy1ZmLuVxSEKKyor/N5yhvp/ZiG1oE3DEYMSFA==} dev: true @@ -3936,6 +4150,10 @@ packages: is-extendable: 1.0.1 dev: true + /extend@3.0.2: + resolution: {integrity: sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==} + dev: false + /extendable-error@0.1.7: resolution: {integrity: sha512-UOiS2in6/Q0FK0R0q6UY9vYpQ21mr/Qn1KOnte7vsACuNJf514WvCCUHSRCPcgjPT2bAhNIJdlE6bVap1GKmeg==} dev: true @@ -3964,6 +4182,15 @@ packages: - supports-color dev: true + /extsprintf@1.3.0: + resolution: {integrity: sha512-11Ndz7Nv+mvAC1j0ktTa7fAb0vLyGGX+rMHNBYQviQDGU0Hw7lhctJANqbPhu9nV9/izT/IntTgZ7Im/9LJs9g==} + engines: {'0': node >=0.6.0} + dev: false + + /fast-deep-equal@3.1.3: + resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==} + dev: false + /fast-diff@1.3.0: resolution: {integrity: sha512-VxPP4NqbUjj6MaAOafWeUn2cXWLcCtljklUtZf0Ind4XQ+QPtmA0b18zZy0jIQx+ExRVCR/ZQpBmik5lXshNsw==} @@ -3996,6 +4223,10 @@ packages: resolution: {integrity: sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ==} dev: false + /fast-json-stable-stringify@2.1.0: + resolution: {integrity: sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==} + dev: false + /fast-levenshtein@2.0.6: resolution: {integrity: sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==} dev: true @@ -4120,7 +4351,6 @@ packages: peerDependenciesMeta: debug: optional: true - dev: true /for-in@1.0.2: resolution: {integrity: sha512-7EwmXrOjyL+ChxMhmG5lnW9MPt1aIeZEwKhQzoBUdTV0N3zuwWDZYVJatDvZ2OyzPUvdIAZDsCetk3coyMfcnQ==} @@ -4135,6 +4365,19 @@ packages: signal-exit: 4.0.2 dev: true + /forever-agent@0.6.1: + resolution: {integrity: sha512-j0KLYPhm6zeac4lz3oJ3o65qvgQCcPubiyotZrXqEaG4hNagNYO8qdlUrX5vwqv9ohqeT/Z3j6+yW067yWWdUw==} + dev: false + + /form-data@2.3.3: + resolution: {integrity: sha512-1lLKB2Mu3aGP1Q/2eCOx0fNbRMe7XdwktwOruhfqqd0rIJWwN4Dh+E3hrPSlDCXnSR7UtZ1N38rVXm+6+MEhJQ==} + engines: {node: '>= 0.12'} + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + dev: false + /form-data@2.5.1: resolution: {integrity: sha512-m21N3WOmEEURgk6B9GLOE4RuWOFf28Lhh9qGYeNlGq4VDXUlJy2th2slBNU8Gp8EzloYZOibZJ7t5ecIrFSjVA==} engines: {node: '>= 0.12'} @@ -4144,6 +4387,15 @@ packages: mime-types: 2.1.35 dev: true + /form-data@3.0.1: + resolution: {integrity: sha512-RHkBKtLWUVwd7SqRIvCZMEvAMoGUp0XU+seQiZejj0COz3RI3hWP4sCv3gZWWLjJTd7rGwcsF5eKZGii0r/hbg==} + engines: {node: '>= 6'} + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + dev: false + /form-data@4.0.0: resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} engines: {node: '>= 6'} @@ -4151,7 +4403,6 @@ packages: asynckit: 0.4.0 combined-stream: 1.0.8 mime-types: 2.1.35 - dev: true /fragment-cache@0.2.1: resolution: {integrity: sha512-GMBAbW9antB8iZRHLoGw0b3HANt57diZYFO/HL1JGIC1MjKrdmhxvrJbupnVvpys0zsz7yBApXdQyfepKly2kA==} @@ -4268,6 +4519,12 @@ packages: engines: {node: '>=0.10.0'} dev: true + /getpass@0.1.7: + resolution: {integrity: sha512-0fzj9JxOLfJ+XGLhR8ze3unN0KZCgZwiSSDz168VERjK8Wl8kVSdcu2kspd4s4wtAa1y/qrVRiAA0WclVsu0ng==} + dependencies: + assert-plus: 1.0.0 + dev: false + /glob-parent@3.1.0: resolution: {integrity: sha512-E8Ak/2+dZY6fnzlR7+ueWvhsH1SjHr4jjss4YS/h4py44jY9MhK/VFdaZJAWDz6BbL21KeteKxFSFpq8OS5gVA==} dependencies: @@ -4374,6 +4631,20 @@ packages: through2: 2.0.5 dev: true + /har-schema@2.0.0: + resolution: {integrity: sha512-Oqluz6zhGX8cyRaTQlFMPw80bSJVG2x/cFb8ZPhUILGgHka9SsokCCOQgpveePerqidZOrT14ipqfJb7ILcW5Q==} + engines: {node: '>=4'} + dev: false + + /har-validator@5.1.5: + resolution: {integrity: sha512-nmT2T0lljbxdQZfspsno9hgrG3Uir6Ks5afism62poxqBM6sDnMEuPmzTq8XN0OEwqKLLdh1jQI3qyE66Nzb3w==} + engines: {node: '>=6'} + deprecated: this library is no longer supported + dependencies: + ajv: 6.12.6 + har-schema: 2.0.0 + dev: false + /hard-rejection@2.1.0: resolution: {integrity: sha512-VIZB+ibDhx7ObhAe7OVtoEbuP4h/MuOTHJ+J8h/eBXotJYl0fBgR72xDFCKgIh22OJZIOVNxBMWuhAr10r8HdA==} engines: {node: '>=6'} @@ -4459,6 +4730,15 @@ packages: lru-cache: 7.18.3 dev: true + /htmlparser2@8.0.2: + resolution: {integrity: sha512-GYdjWKDkbRLkZ5geuHs5NY1puJ+PXwP7+fHPRz06Eirsb9ugf6d8kkXav6ADhcODhFFPMIXyxkxSuMf3D6NCFA==} + dependencies: + domelementtype: 2.3.0 + domhandler: 5.0.3 + domutils: 3.1.0 + entities: 4.5.0 + dev: false + /http-assert@1.5.0: resolution: {integrity: sha512-uPpH7OKX4H25hBmU6G1jWNaqJGpTXxey+YOUizJUAgu0AjLUeC8D73hTrhvDS5D+GJN1DN1+hhc/eF/wpxtp0w==} engines: {node: '>= 0.8'} @@ -4527,6 +4807,15 @@ packages: - supports-color dev: true + /http-signature@1.2.0: + resolution: {integrity: sha512-CAbnr6Rz4CYQkLYUtSNXxQPUH2gK8f3iWexVlsnMeD+GjlsQ0Xsy1cOX+mN3dtxYomRy21CiOzU8Uhw6OwncEQ==} + engines: {node: '>=0.8', npm: '>=1.3.7'} + dependencies: + assert-plus: 1.0.0 + jsprim: 1.4.2 + sshpk: 1.18.0 + dev: false + /https-proxy-agent@5.0.1: resolution: {integrity: sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==} engines: {node: '>= 6'} @@ -4925,6 +5214,10 @@ packages: has-symbols: 1.0.3 dev: true + /is-typedarray@1.0.0: + resolution: {integrity: sha512-cyA56iCMHAh5CdzjJIa4aohJyeO1YbwLi3Jc35MmRU6poroFjIGZzUzupGiRPOjgHg9TLu43xbpwXk523fMxKA==} + dev: false + /is-unicode-supported@0.1.0: resolution: {integrity: sha512-knxG2q4UC3u8stRGyAVJCOdxFmv5DZiRcdlIaAQXAbSfJya+OhopNotLQrstBhququ4ZpuKbDc/8S6mgXgPFPw==} engines: {node: '>=10'} @@ -4934,6 +5227,10 @@ packages: resolution: {integrity: sha512-43r2mRvz+8JRIKnWJ+3j8JtjRKZ6GmjzfaE/qiBJnikNnYv/6bagRJ1kUhNk8R5EX/GkobD+r+sfxCPJsiKBLQ==} engines: {node: '>=12'} + /is-utf8@0.2.1: + resolution: {integrity: sha512-rMYPYvCzsXywIsldgLaSoPlw5PfoB/ssr7hY4pLfcodrA5M/eArza1a9VmTiNIBNMjOGr1Ow9mTyU2o69U6U9Q==} + dev: false + /is-weakref@1.0.2: resolution: {integrity: sha512-qctsuLZmIQ0+vSSMfoVvyFe2+GSEvnmZ2ezTup1SBse9+twCCeial6EEi3Nc2KFcf6+qz2FBPnjXsk8xhKSaPQ==} dependencies: @@ -4974,6 +5271,10 @@ packages: engines: {node: '>=0.10.0'} dev: true + /isstream@0.1.2: + resolution: {integrity: sha512-Yljz7ffyPbrLpLngrMtZ7NduUgVvi6wG9RJ9IUcyCd59YQ911PBJphODUcbOVbqYfxe1wuYf/LJ8PauMRwsM/g==} + dev: false + /jackspeak@2.2.2: resolution: {integrity: sha512-mgNtVv4vUuaKA97yxUHoA3+FkuhtxkjdXEWOyB/N76fjy0FjezEt34oy3epBtvCvS+7DyKwqCFWx/oJLV5+kCg==} engines: {node: '>=14'} @@ -5014,6 +5315,10 @@ packages: argparse: 2.0.1 dev: true + /jsbn@0.1.1: + resolution: {integrity: sha512-UVU9dibq2JcFWxQPA6KCqj5O42VOmAY3zQUfEKxU0KpTGXwNoCjkX1e13eHNvw/xPynt6pU0rZ1htjWTNTSXsg==} + dev: false + /json-diff@1.0.6: resolution: {integrity: sha512-tcFIPRdlc35YkYdGxcamJjllUhXWv4n2rK9oJ2RsAzV4FBkuV4ojKEDgcZ+kpKxDmJKv+PFK65+1tVVOnSeEqA==} hasBin: true @@ -5027,6 +5332,22 @@ packages: resolution: {integrity: sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==} dev: true + /json-schema-traverse@0.4.1: + resolution: {integrity: sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==} + dev: false + + /json-schema-traverse@1.0.0: + resolution: {integrity: sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==} + dev: false + + /json-schema@0.4.0: + resolution: {integrity: sha512-es94M3nTIfsEPisRafak+HDLfHXnKBhV3vU5eqPcS3flIWqcxJWgXHXiey3YrpaNsanY5ei1VoYEbOzijuq9BA==} + dev: false + + /json-stringify-safe@5.0.1: + resolution: {integrity: sha512-ZClg6AaYvamvYEE82d3Iyd3vSSIjQ+odgjaTzRuO3s7toCdFKczob2i0zCh7JE8kWn17yvAWhUVxvqGwUalsRA==} + dev: false + /jsonfile@4.0.0: resolution: {integrity: sha512-m6F1R3z8jjlf2imQHS2Qez5sjKWQzbuuhuJ/FKYFRZvPE3PuHcSMVZzfsLhGVOkfd20obL5SWEBew5ShlquNxg==} optionalDependencies: @@ -5041,7 +5362,6 @@ packages: /jsonpath-plus@4.0.0: resolution: {integrity: sha512-e0Jtg4KAzDJKKwzbLaUtinCn0RZseWBVRTRGihSpvFlM3wTR7ExSp+PTdeTsDrLNJUe7L7JYJe8mblHX5SCT6A==} engines: {node: '>=10.0'} - dev: true /jsonpath@1.1.1: resolution: {integrity: sha512-l6Cg7jRpixfbgoWgkrl77dgEj8RPvND0wMH6TwQmi9Qs4TFfS9u5cUFnbeKTwj5ga5Y3BTGGNI28k117LJ009w==} @@ -5051,6 +5371,16 @@ packages: underscore: 1.12.1 dev: true + /jsprim@1.4.2: + resolution: {integrity: sha512-P2bSOMAc/ciLz6DzgjVlGJP9+BrJWu5UDGK70C2iweC5QBIeFf0ZXRvGjEj2uYgrY2MkAAhsSWHDWlFtEroZWw==} + engines: {node: '>=0.6.0'} + dependencies: + assert-plus: 1.0.0 + extsprintf: 1.3.0 + json-schema: 0.4.0 + verror: 1.10.0 + dev: false + /keygrip@1.1.0: resolution: {integrity: sha512-iYSchDJ+liQ8iwbSI2QqsQOvqv58eJCEanyJPJi+Khyu8smkcKSFUCbPwzFcL7YVtZ6eONjqRX/38caJ7QjRAQ==} engines: {node: '>= 0.6'} @@ -5773,6 +6103,16 @@ packages: path-key: 3.1.1 dev: true + /nth-check@2.1.1: + resolution: {integrity: sha512-lqjrjmaOoAnWfMmBPL+XNnynZh2+swxiX3WUE0s4yEHI6m+AwrK2UZOimIRl3X/4QctVqS8AiZjFqyOGrMXb/w==} + dependencies: + boolbase: 1.0.0 + dev: false + + /oauth-sign@0.9.0: + resolution: {integrity: sha512-fexhUFFPTGV8ybAtSIGbV6gOkSv8UtRbDBnAyLQw4QPKkgNlsH2ByPGtMUqdWkos6YCRmAqViwgZrJc/mRDzZQ==} + dev: false + /object-assign@4.1.1: resolution: {integrity: sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==} engines: {node: '>=0.10.0'} @@ -6042,6 +6382,19 @@ packages: resolution: {integrity: sha512-Tpb8Z7r7XbbtBTrM9UhpkzzaMrqA2VXMT3YChzYltwV3P3pM6t8wl7TvpMnSTosz1aQAdVib7kdoys7vYOPerw==} engines: {node: '>=12'} + /parse5-htmlparser2-tree-adapter@7.0.0: + resolution: {integrity: sha512-B77tOZrqqfUfnVcOrUvfdLbz4pu4RopLD/4vmu3HUPswwTA8OH0EMW9BlWR2B0RCoiZRAHEUu7IxeP1Pd1UU+g==} + dependencies: + domhandler: 5.0.3 + parse5: 7.1.2 + dev: false + + /parse5@7.1.2: + resolution: {integrity: sha512-Czj1WaSVpaoj0wbhMzLmWD69anp2WH7FXMB9n1Sy8/ZFF9jolSQVMu1Ij5WIyGmcBmhk7EOndpO4mIpihVqAXw==} + dependencies: + entities: 4.5.0 + dev: false + /parseurl@1.3.3: resolution: {integrity: sha512-CiyeOxFT/JZyN5m0z9PfXw4SCBJ6Sygz1Dpl0wqjlhDEGGBP1GnsUVEL0p63hoG1fcj3fHynXi9NYO4nWOL+qQ==} engines: {node: '>= 0.8'} @@ -6124,6 +6477,10 @@ packages: through2: 2.0.5 dev: true + /performance-now@2.1.0: + resolution: {integrity: sha512-7EAHlyLHI56VEIdK57uwHdHKIaAGbnXPiw0yWbarQZOKaKpvUIgW0jWRVLiatnM+XXlSwsanIBH/hzGMJulMow==} + dev: false + /phoenix@1.7.7: resolution: {integrity: sha512-moAN6e4Z16x/x1nswUpnTR2v5gm7HsI7eluZ2YnYUUsBNzi3cY/5frmiJfXIEi877IQAafzTfp8hd6vEUMme+w==} dev: false @@ -6348,7 +6705,6 @@ packages: /proxy-from-env@1.1.0: resolution: {integrity: sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==} - dev: true /proxy-middleware@0.15.0: resolution: {integrity: sha512-EGCG8SeoIRVMhsqHQUdDigB2i7qU7fCsWASwn54+nPutYO8n4q6EiwMzyfWlC+dzRFExP+kvcnDFdBDHoZBU7Q==} @@ -6359,6 +6715,10 @@ packages: resolution: {integrity: sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==} dev: true + /psl@1.9.0: + resolution: {integrity: sha512-E/ZsdU4HLs/68gYzgGTkMicWTLPdAftJLfJFlLUAAKZGkStNU72sZjT66SnMDVOfOWY/YAoiD7Jxa9iHvngcag==} + dev: false + /pstree.remy@1.1.8: resolution: {integrity: sha512-77DZwxQmxKnu3aR542U+X8FypNzbfJ+C5XQDk3uWjWxn6151aIMGthWYRXTqT1E5oJvg+ljaa2OJi+VfvCOQ8w==} dev: true @@ -6381,7 +6741,6 @@ packages: /punycode@2.3.0: resolution: {integrity: sha512-rRV+zQD8tVFys26lAGR9WUuS4iUAngJScM+ZRSKtvl5tKeZ2t5bvdNFdNHBW9FWR4guGHlgmsZ1G7BSm2wTbuA==} engines: {node: '>=6'} - dev: true /qs@6.11.2: resolution: {integrity: sha512-tDNIz22aBzCDxLtVH++VnTfzxlfeK5CbqohpSqpJgj1Wg/cQbStNAz3NuqCs5vV+pjBsK4x4pN9HlVh7rcYRiA==} @@ -6390,6 +6749,11 @@ packages: side-channel: 1.0.4 dev: false + /qs@6.5.3: + resolution: {integrity: sha512-qxXIEh4pCGfHICj1mAJQ2/2XVZkjCDTcEgfoSQxc/fYivUZxTkk7L3bDBJSoNrEzXI17oUO5Dp07ktqE5KzczA==} + engines: {node: '>=0.6'} + dev: false + /query-string@8.1.0: resolution: {integrity: sha512-BFQeWxJOZxZGix7y+SByG3F36dA0AbTy9o6pSmKFcFz7DAj0re9Frkty3saBn3nHo3D0oZJ/+rx3r8H8r8Jbpw==} engines: {node: '>=14.16'} @@ -6399,6 +6763,10 @@ packages: split-on-first: 3.0.0 dev: true + /querystringify@2.2.0: + resolution: {integrity: sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==} + dev: false + /queue-microtask@1.2.3: resolution: {integrity: sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==} @@ -6583,14 +6951,50 @@ packages: engines: {node: '>=0.10'} dev: true + /request@2.88.2: + resolution: {integrity: sha512-MsvtOrfG9ZcrOwAW+Qi+F6HbD0CWXEh9ou77uOb7FM2WPhwT7smM833PzanhJLsgXjN89Ir6V2PczXNnMpwKhw==} + engines: {node: '>= 6'} + deprecated: request has been deprecated, see https://github.com/request/request/issues/3142 + dependencies: + aws-sign2: 0.7.0 + aws4: 1.12.0 + caseless: 0.12.0 + combined-stream: 1.0.8 + extend: 3.0.2 + forever-agent: 0.6.1 + form-data: 2.3.3 + har-validator: 5.1.5 + http-signature: 1.2.0 + is-typedarray: 1.0.0 + isstream: 0.1.2 + json-stringify-safe: 5.0.1 + mime-types: 2.1.35 + oauth-sign: 0.9.0 + performance-now: 2.1.0 + qs: 6.5.3 + safe-buffer: 5.2.1 + tough-cookie: 2.5.0 + tunnel-agent: 0.6.0 + uuid: 3.4.0 + dev: false + /require-directory@2.1.1: resolution: {integrity: sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==} engines: {node: '>=0.10.0'} + /require-from-string@2.0.2: + resolution: {integrity: sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==} + engines: {node: '>=0.10.0'} + dev: false + /require-main-filename@2.0.0: resolution: {integrity: sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==} dev: true + /requires-port@1.0.0: + resolution: {integrity: sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==} + dev: false + /resolve-cwd@3.0.0: resolution: {integrity: sha512-OrZaX2Mb+rJCpH/6CpSqt9xFVpN++x01XnN2ie9g6P5/3xelLAkXWVADpdz1IHD/KFfEXyE6V0U01OQ3UO2rEg==} engines: {node: '>=8'} @@ -7002,6 +7406,22 @@ packages: /sprintf-js@1.0.3: resolution: {integrity: sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==} + /sshpk@1.18.0: + resolution: {integrity: sha512-2p2KJZTSqQ/I3+HX42EpYOa2l3f8Erv8MWKsy2I9uf4wA7yFIkXRffYdsx86y6z4vHtV8u7g+pPlr8/4ouAxsQ==} + engines: {node: '>=0.10.0'} + hasBin: true + dependencies: + asn1: 0.2.6 + assert-plus: 1.0.0 + bcrypt-pbkdf: 1.0.2 + dashdash: 1.14.1 + ecc-jsbn: 0.1.2 + getpass: 0.1.7 + jsbn: 0.1.1 + safer-buffer: 2.1.2 + tweetnacl: 0.14.5 + dev: false + /ssri@10.0.4: resolution: {integrity: sha512-12+IR2CB2C28MMAw0Ncqwj5QbTcs0nGIhgJzYWzDkb21vWmfNI83KS4f3Ci6GI98WreIfG7o9UXp3C0qbpA8nQ==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} @@ -7110,6 +7530,13 @@ packages: dependencies: ansi-regex: 6.0.1 + /strip-bom@2.0.0: + resolution: {integrity: sha512-kwrX1y7czp1E69n2ajbG65mIo9dqvJ+8aBQXOGVxqwvNbsXdFM6Lq37dLAY3mknUwru8CfcCbfOLL/gMo+fi3g==} + engines: {node: '>=0.10.0'} + dependencies: + is-utf8: 0.2.1 + dev: false + /strip-bom@3.0.0: resolution: {integrity: sha512-vavAMRXOgBVNF6nyEEmL3DBK19iRpDcoIwW+swQ+CbGiu7lju6t+JklA1MHweoWtadgt4ISVUsXLyDq34ddcwA==} engines: {node: '>=4'} @@ -7326,6 +7753,24 @@ packages: nopt: 1.0.10 dev: true + /tough-cookie@2.5.0: + resolution: {integrity: sha512-nlLsUzgm1kfLXSXfRZMc1KLAugd4hqJHDTvc2hDIwS3mZAfMEuMbc03SujMF+GEcpaX/qboeycw6iO8JwVv2+g==} + engines: {node: '>=0.8'} + dependencies: + psl: 1.9.0 + punycode: 2.3.0 + dev: false + + /tough-cookie@4.1.3: + resolution: {integrity: sha512-aX/y5pVRkfRnfmuX+OdbSdXvPe6ieKX/G2s7e98f4poJHnqH3281gDPm/metm6E/WRamfx7WC4HUqkWHfQHprw==} + engines: {node: '>=6'} + dependencies: + psl: 1.9.0 + punycode: 2.3.0 + universalify: 0.2.0 + url-parse: 1.5.10 + dev: false + /tr46@0.0.3: resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==} dev: false @@ -7585,6 +8030,16 @@ packages: yargs: 17.7.2 dev: true + /tunnel-agent@0.6.0: + resolution: {integrity: sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==} + dependencies: + safe-buffer: 5.2.1 + dev: false + + /tweetnacl@0.14.5: + resolution: {integrity: sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA==} + dev: false + /type-check@0.3.2: resolution: {integrity: sha512-ZCmOJdvOWDBYJlzAoFkC+Q0+bUyEOS1ltgp1MGU03fqHG+dbi9tBFU2Rd9QKiDZFAYrhPh2JUf7rZRIuHRKtOg==} engines: {node: '>= 0.8.0'} @@ -7663,6 +8118,13 @@ packages: resolution: {integrity: sha512-hEQt0+ZLDVUMhebKxL4x1BTtDY7bavVofhZ9KZ4aI26X9SRaE+Y3m83XUL1UP2jn8ynjndwCCpEHdUG+9pP1Tw==} dev: true + /undici@5.27.2: + resolution: {integrity: sha512-iS857PdOEy/y3wlM3yRp+6SNQQ6xU0mmZcwRSriqk+et/cwWAtwmIGf6WkoDN2EK/AMdCO/dfXzIwi+rFMrjjQ==} + engines: {node: '>=14.0'} + dependencies: + '@fastify/busboy': 2.1.0 + dev: false + /union-value@1.0.1: resolution: {integrity: sha512-tJfXmxMeWYnczCVs7XAEvIV7ieppALdyepWMkHkwciRpZraG/xwT+s2JN8+pr1+8jCRf80FFzvr+MpQeeoF4Xg==} engines: {node: '>=0.10.0'} @@ -7692,6 +8154,11 @@ packages: engines: {node: '>= 4.0.0'} dev: true + /universalify@0.2.0: + resolution: {integrity: sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==} + engines: {node: '>= 4.0.0'} + dev: false + /unix-crypt-td-js@1.1.4: resolution: {integrity: sha512-8rMeVYWSIyccIJscb9NdCfZKSRBKYTeVnwmiRYT2ulE3qd1RaDQ0xQDP+rI3ccIWbhu/zuo5cgN8z73belNZgw==} dev: true @@ -7713,6 +8180,12 @@ packages: engines: {node: '>=4'} dev: true + /uri-js@4.4.1: + resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==} + dependencies: + punycode: 2.3.0 + dev: false + /urix@0.1.0: resolution: {integrity: sha512-Am1ousAhSLBeB9cG/7k7r2R0zj50uDRlZHPGbazid5s9rlF1F/QKYObEKSIunSjIOkJZqwRRLpvewjEkM7pSqg==} deprecated: Please see https://github.com/lydell/urix#deprecated @@ -7723,6 +8196,13 @@ packages: engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} dev: false + /url-parse@1.5.10: + resolution: {integrity: sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==} + dependencies: + querystringify: 2.2.0 + requires-port: 1.0.0 + dev: false + /use@3.1.1: resolution: {integrity: sha512-cwESVXlO3url9YWlFW/TA9cshCEhtu7IKJ/p5soJ/gGpj7vbvFrAY/eIioQ6Dw23KjZhYgiIo8HOs1nQ2vr/oQ==} engines: {node: '>=0.10.0'} @@ -7741,7 +8221,6 @@ packages: resolution: {integrity: sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==} deprecated: Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details. hasBin: true - dev: true /v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} @@ -7764,6 +8243,15 @@ packages: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} + /verror@1.10.0: + resolution: {integrity: sha512-ZZKSmDAEFOijERBLkmYfJ+vmk3w+7hOLYDNkRCuRuMJGEmqYNCNLyBBFwWKVMhfwaEF3WOd0Zlw86U/WC/+nYw==} + engines: {'0': node >=0.6.0} + dependencies: + assert-plus: 1.0.0 + core-util-is: 1.0.2 + extsprintf: 1.3.0 + dev: false + /wcwidth@1.0.1: resolution: {integrity: sha512-XHPEwS0q6TaxcvG85+8EYkbiCux2XtWG2mkc47Ng2A77BQu9+DqIOJldST4HgPkuea7dvKSj5VgX3P1d4rW8Tg==} dependencies: From 34d7049bc6e1327e3d0df2b767b9553c338802de Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 10:45:25 +0000 Subject: [PATCH 13/22] runtime: don't publish job complete for trigger nodes --- packages/runtime/src/execute/job.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/runtime/src/execute/job.ts b/packages/runtime/src/execute/job.ts index 630ad55ca..a39f14717 100644 --- a/packages/runtime/src/execute/job.ts +++ b/packages/runtime/src/execute/job.ts @@ -156,16 +156,16 @@ const executeJob = async ( throw e; } } - } - if (!didError) { - next = calculateNext(job, result); - notify(NOTIFY_JOB_COMPLETE, { - duration: Date.now() - duration, - state: result, - jobId, - next, - }); + if (!didError) { + next = calculateNext(job, result); + notify(NOTIFY_JOB_COMPLETE, { + duration: Date.now() - duration, + state: result, + jobId, + next, + }); + } } return { next, state: result }; From 48de31a9ca19da6a359b0c7394595485131eddf8 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 11:24:47 +0000 Subject: [PATCH 14/22] worker: remove catch from jobComplete handler --- packages/ws-worker/src/api/execute.ts | 39 +++++++++++------------ packages/ws-worker/test/lightning.test.ts | 1 + 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index b21374bf1..1f7dfa902 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -266,27 +266,24 @@ export function onJobComplete( delete state.activeRun; delete state.activeJob; - try { - const { reason, error_message, error_type } = calculateJobExitReason( - job_id, - event.state, - error - ); - state.reasons[job_id] = { reason, error_message, error_type }; - - return sendEvent(channel, RUN_COMPLETE, { - run_id, - job_id, - output_dataclip_id: dataclipId, - output_dataclip: stringify(event.state), - - reason, - error_message, - error_type, - }); - } catch (e) { - console.log(e); - } + const { reason, error_message, error_type } = calculateJobExitReason( + job_id, + event.state, + error + ); + state.reasons[job_id] = { reason, error_message, error_type }; + + const evt = { + run_id, + job_id, + output_dataclip_id: dataclipId, + output_dataclip: stringify(event.state), + + reason, + error_message, + error_type, + }; + return sendEvent(channel, RUN_COMPLETE, evt); } export function onWorkflowStart( diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index f90f0d5c0..faf004527 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -395,6 +395,7 @@ test('should register and de-register attempts to the server', async (t) => { }); }); +// TODO this is a server test // What I am testing here is that the first job completes // before the second job starts test('should not claim while at capacity', async (t) => { From b6248354607d50b75b7b39a1178ac922fb05210a Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:15:17 +0000 Subject: [PATCH 15/22] worker: attempt state must be smarter about setting up initial dataclip --- packages/ws-worker/src/api/execute.ts | 45 +------- packages/ws-worker/src/types.d.ts | 16 +++ .../src/util/create-attempt-state.ts | 52 +++++++++ packages/ws-worker/src/util/index.ts | 9 +- packages/ws-worker/test/api/execute.test.ts | 23 +--- .../test/util/create-attempt-state.test.ts | 101 ++++++++++++++++++ 6 files changed, 181 insertions(+), 65 deletions(-) create mode 100644 packages/ws-worker/src/util/create-attempt-state.ts create mode 100644 packages/ws-worker/test/util/create-attempt-state.test.ts diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 1f7dfa902..311619f39 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -14,8 +14,8 @@ import { RUN_START, RUN_START_PAYLOAD, } from '../events'; -import { AttemptOptions, Channel, ExitReason } from '../types'; -import { getWithReply, stringify } from '../util'; +import { AttemptOptions, Channel, AttemptState } from '../types'; +import { getWithReply, stringify, createAttemptState } from '../util'; import type { JSONLog, Logger } from '@openfn/logger'; import type { @@ -31,21 +31,6 @@ import { calculateAttemptExitReason, calculateJobExitReason } from './reasons'; const enc = new TextDecoder('utf-8'); -export type AttemptState = { - activeRun?: string; - activeJob?: string; - plan: ExecutionPlan; - options: AttemptOptions; - dataclips: Record; - // For each run, map the input ids - // TODO better name maybe? - inputDataclips: Record; - reasons: Record; - - // final dataclip id - lastDataclipId?: string; -}; - export type Context = { channel: Channel; state: AttemptState; @@ -62,32 +47,6 @@ const eventMap = { 'workflow-complete': ATTEMPT_COMPLETE, }; -export const createAttemptState = ( - plan: ExecutionPlan, - options: AttemptOptions = {} -): AttemptState => { - const state = { - plan, - lastDataclipId: '', - dataclips: {}, - inputDataclips: {}, - reasons: {}, - options, - } as AttemptState; - - if (typeof plan.initialState === 'string') { - const startJobId = plan.start ?? plan.jobs[0].id; - // @ts-ignore - state.inputDataclips[startJobId] = plan.initialState; - } else { - // what if initial state is an object? - // In practice I don't think this will happen, - // but the first input_state_id will be messed up - } - - return state; -}; - // pass a web socket connected to the attempt channel // this thing will do all the work export function execute( diff --git a/packages/ws-worker/src/types.d.ts b/packages/ws-worker/src/types.d.ts index 5601b2cda..1d2b82f97 100644 --- a/packages/ws-worker/src/types.d.ts +++ b/packages/ws-worker/src/types.d.ts @@ -76,6 +76,22 @@ export type AttemptOptions = { sanitize?: SanitizePolicies; }; +// Internal server state for each attempt +export type AttemptState = { + activeRun?: string; + activeJob?: string; + plan: ExecutionPlan; + options: AttemptOptions; + dataclips: Record; + // For each run, map the input ids + // TODO better name maybe? + inputDataclips: Record; + reasons: Record; + + // final dataclip id + lastDataclipId?: string; +}; + export type CancelablePromise = Promise & { cancel: () => void; }; diff --git a/packages/ws-worker/src/util/create-attempt-state.ts b/packages/ws-worker/src/util/create-attempt-state.ts new file mode 100644 index 000000000..3aaff826b --- /dev/null +++ b/packages/ws-worker/src/util/create-attempt-state.ts @@ -0,0 +1,52 @@ +import type { ExecutionPlan } from '@openfn/runtime'; +import type { AttemptOptions, AttemptState } from '../types'; + +export default ( + plan: ExecutionPlan, + options: AttemptOptions = {} +): AttemptState => { + const state = { + plan, + lastDataclipId: '', + dataclips: {}, + inputDataclips: {}, + reasons: {}, + options, + } as AttemptState; + + if (typeof plan.initialState === 'string') { + // We need to initialise inputDataclips so that the first run + // has its inputDataclip set properly + // Difficulty: the starting node is a trigger and NOT a run + // We need to find the first job with a body downstream of the start + // and set the input state on THAT + + // find the first job + let startNode = plan.jobs[0]; + if (plan.start) { + startNode = plan.jobs.find(({ id }) => id === plan.start)!; + } + + // TODO throw with validation error of some kind if this node could not be found + + const initialRuns: string[] = []; + // If this is a trigger, get the downstream jobs + if (!startNode.expression) { + initialRuns.push(...Object.keys(startNode.next!)); + } else { + initialRuns.push(startNode.id!); + } + + // For any runs downstream of the initial state, + // Set up the input dataclip + initialRuns.forEach((id) => { + state.inputDataclips[id] = plan.initialState as string; + }); + } else { + // what if initial state is an object? + // In practice I don't think this will happen, + // but the first input_state_id will be messed up + } + + return state; +}; diff --git a/packages/ws-worker/src/util/index.ts b/packages/ws-worker/src/util/index.ts index 781e18394..241236a00 100644 --- a/packages/ws-worker/src/util/index.ts +++ b/packages/ws-worker/src/util/index.ts @@ -2,5 +2,12 @@ import convertAttempt from './convert-attempt'; import tryWithBackoff from './try-with-backoff'; import getWithReply from './get-with-reply'; import stringify from './stringify'; +import createAttemptState from './create-attempt-state'; -export { convertAttempt, tryWithBackoff, getWithReply, stringify }; +export { + convertAttempt, + tryWithBackoff, + getWithReply, + stringify, + createAttemptState, +}; diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index bfaac3289..aa968f616 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -17,16 +17,15 @@ import { execute, onWorkflowStart, onWorkflowComplete, - AttemptState, loadDataclip, loadCredential, sendEvent, - createAttemptState, } from '../../src/api/execute'; import createMockRTE from '../../src/mock/runtime-engine'; import { mockChannel } from '../../src/mock/sockets'; -import { stringify } from '../../src/util'; +import { stringify, createAttemptState } from '../../src/util'; import { ExecutionPlan } from '@openfn/runtime'; +import type { AttemptState } from '../../src/types'; const enc = new TextEncoder(); @@ -45,24 +44,6 @@ const mockEventHandlers = { // This is a nonsense timestamp but it's fine for the test (and easy to convert) const getBigIntTimestamp = () => (BigInt(Date.now()) * BigInt(1e6)).toString(); -test('createAttemptState: set initial input dataclip for job[0]', (t) => { - const plan = { initialState: 'x', jobs: [{ id: 'a' }] }; - const attempt = createAttemptState(plan); - - t.deepEqual(attempt.inputDataclips, { a: 'x' }); -}); - -test('createAttemptState: set initial input dataclip for attempt.start', (t) => { - const plan = { - initialState: 'x', - start: 'a', - jobs: [{ id: 'b' }, { id: 'a' }], - }; - const attempt = createAttemptState(plan); - - t.deepEqual(attempt.inputDataclips, { a: 'x' }); -}); - test('send event should resolve when the event is acknowledged', async (t) => { const channel = mockChannel({ echo: (x) => x, diff --git a/packages/ws-worker/test/util/create-attempt-state.test.ts b/packages/ws-worker/test/util/create-attempt-state.test.ts new file mode 100644 index 000000000..7a559d52c --- /dev/null +++ b/packages/ws-worker/test/util/create-attempt-state.test.ts @@ -0,0 +1,101 @@ +import test from 'ava'; + +import { createAttemptState } from '../../src/util'; + +test('create attempt', (t) => { + const options = { timeout: 666 }; + const plan = { jobs: [{ id: 'a' }] }; + const attempt = createAttemptState(plan, options); + + t.deepEqual(attempt.plan, plan); + t.deepEqual(attempt.lastDataclipId, ''); + t.deepEqual(attempt.dataclips, {}); + t.deepEqual(attempt.inputDataclips, {}); + t.deepEqual(attempt.reasons, {}); + t.deepEqual(attempt.options, options); +}); + +test('Set initial input dataclip if no explicit start and first job is a run', (t) => { + const plan = { initialState: 'x', jobs: [{ id: 'a', expression: '.' }] }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 'x' }); +}); + +test('Set initial input dataclip if the explicit start is a run', (t) => { + const plan = { + initialState: 'x', + start: 'a', + jobs: [ + { id: 'b', expression: '.' }, + { id: 'a', expression: '.' }, + ], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 'x' }); +}); + +test('Set initial input dataclip if the start is a trigger (simple)', (t) => { + const plan = { + initialState: 's', + start: 't', + jobs: [ + { id: 't', next: { a: true } }, + { id: 'a', expression: '.' }, + ], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 's' }); +}); + +test('Set initial input dataclip if the start is a trigger (complex)', (t) => { + const plan = { + initialState: 's', + start: 't', + jobs: [ + { id: 'a', expression: '.' }, + { id: 'b', expression: '.' }, + { id: 'c', expression: '.' }, + { id: 'd', expression: '.' }, + { id: 't', next: { c: true } }, + ], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { c: 's' }); +}); + +test('Set initial input dataclip with a trigger as implicit start', (t) => { + const plan = { + initialState: 's', + jobs: [ + { id: 't', next: { c: true } }, + { id: 'a', expression: '.' }, + { id: 'b', expression: '.' }, + { id: 'c', expression: '.' }, + { id: 'd', expression: '.' }, + ], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { c: 's' }); +}); + +test('Set initial input dataclip with a trigger with multiple downstream jobs', (t) => { + const plan = { + initialState: 's', + start: 't', + jobs: [ + { id: 'a', expression: '.' }, + { id: 'b', expression: '.' }, + { id: 't', next: { a: true, b: true, c: true } }, + { id: 'c', expression: '.' }, + { id: 'd', expression: '.' }, + ], + }; + const attempt = createAttemptState(plan); + + t.deepEqual(attempt.inputDataclips, { a: 's', b: 's', c: 's' }); +}); From 089d48195b1fc847f558dbb826152c561e185bcf Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:20:01 +0000 Subject: [PATCH 16/22] tests: add worker tests to cover trigger nodes --- integration-tests/worker/package.json | 2 +- integration-tests/worker/src/factories.ts | 23 +++++--- integration-tests/worker/src/init.ts | 2 +- .../worker/test/attempts.test.ts | 52 +++++++++++++++++-- .../worker/test/exit-reasons.test.ts | 2 +- .../worker/test/integration.test.ts | 2 +- 6 files changed, 69 insertions(+), 14 deletions(-) diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 1b0446502..d31529a29 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -7,7 +7,7 @@ "license": "ISC", "type": "module", "scripts": { - "clean": "rimraf dist tmp/repo/*", + "clean": "rimraf dist tmp/repo/**", "build:pack": "pnpm clean && cd ../.. && pnpm pack:local integration-tests/worker/dist --no-version", "build": "pnpm build:pack && docker build --tag worker-integration-tests .", "start": "docker run worker-integration-tests", diff --git a/integration-tests/worker/src/factories.ts b/integration-tests/worker/src/factories.ts index f39729275..55d359aa8 100644 --- a/integration-tests/worker/src/factories.ts +++ b/integration-tests/worker/src/factories.ts @@ -1,7 +1,8 @@ import crypto from 'node:crypto'; -export const createAttempt = (jobs, edges, args) => ({ +export const createAttempt = (triggers, jobs, edges, args) => ({ id: crypto.randomUUID(), + triggers, jobs, edges, ...args, @@ -10,15 +11,25 @@ export const createAttempt = (jobs, edges, args) => ({ export const createJob = (args) => ({ id: crypto.randomUUID(), adaptor: '@openfn/language-common@latest', - expression: 'fn((s) => s)', + body: 'fn((s) => s)', ...args, }); -export const createEdge = (a: any, b: any, condition?: string) => ({ +export const createTrigger = () => ({ id: crypto.randomUUID(), - source_job_id: a.id, - target_job_id: b.id, - // condition, }); +export const createEdge = (a: any, b: any, condition?: string) => { + const edge: any = { + id: crypto.randomUUID(), + target_job_id: b.id, + }; + if (!a.body) { + edge.source_trigger_id = a.id; + } else { + edge.source_job_id = a.id; + } + return edge; +}; + export default createAttempt; diff --git a/integration-tests/worker/src/init.ts b/integration-tests/worker/src/init.ts index 2c57c2e61..0fb084332 100644 --- a/integration-tests/worker/src/init.ts +++ b/integration-tests/worker/src/init.ts @@ -18,7 +18,7 @@ export const initWorker = async (lightningPort, engineArgs = {}) => { const workerPort = randomPort(); const engine = await createEngine({ - logger: createLogger('engine', { level: 'debug' }), + // logger: createLogger('engine', { level: 'debug' }), logger: createMockLogger(), repoDir: path.resolve('./tmp/repo/default'), ...engineArgs, diff --git a/integration-tests/worker/test/attempts.test.ts b/integration-tests/worker/test/attempts.test.ts index a8c35a599..078ca4d49 100644 --- a/integration-tests/worker/test/attempts.test.ts +++ b/integration-tests/worker/test/attempts.test.ts @@ -1,7 +1,12 @@ import test from 'ava'; import path from 'node:path'; -import { createAttempt, createEdge, createJob } from '../src/factories'; +import { + createAttempt, + createEdge, + createJob, + createTrigger, +} from '../src/factories'; import { initLightning, initWorker } from '../src/init'; let lightning; @@ -13,7 +18,7 @@ test.before(async () => { lightning = initLightning(lightningPort); ({ worker } = await initWorker(lightningPort, { - repoDir: path.resolve('tmp/openfn/repo/attempts'), + repoDir: path.resolve('tmp/repo/attempts'), })); }); @@ -43,7 +48,7 @@ test('echo initial state', async (t) => { lightning.addDataclip('s1', initialState); const job = createJob({ body: 'fn((s) => s)' }); - const attempt = createAttempt([job], [], { + const attempt = createAttempt([], [job], [], { dataclip_id: 's1', }); @@ -56,6 +61,45 @@ test('echo initial state', async (t) => { }); }); +test('start from a trigger node', async (t) => { + let runStartEvent; + let runCompleteEvent; + + const initialState = { data: { count: 22 } }; + + lightning.addDataclip('s1', initialState); + + const trigger = createTrigger(); + const job = createJob({ body: 'fn((s) => s)' }); + const edge = createEdge(trigger, job); + const attempt = createAttempt([trigger], [job], [edge], { + dataclip_id: 's1', + }); + + lightning.once('run:start', (evt) => { + runStartEvent = evt.payload; + }); + + lightning.once('run:complete', (evt) => { + runCompleteEvent = evt.payload; + }); + + await run(attempt); + + t.truthy(runStartEvent); + t.is(runStartEvent.job_id, job.id); + t.truthy(runStartEvent.run_id); + t.is(runStartEvent.input_dataclip_id, 's1'); + + t.truthy(runCompleteEvent); + t.is(runCompleteEvent.reason, 'success'); + t.is(runCompleteEvent.error_message, null); + t.is(runCompleteEvent.error_type, null); + t.is(runCompleteEvent.job_id, job.id); + t.truthy(runCompleteEvent.output_dataclip_id); + t.is(runCompleteEvent.output_dataclip, JSON.stringify(initialState)); +}); + // hmm this event feels a bit fine-grained for this // This file should just be about input-output // TODO maybe move it into integrations later @@ -77,7 +121,7 @@ test('run parallel jobs', async (t) => { const jobs = [a, x, y]; const edges = [ax, ay]; - const attempt = createAttempt(jobs, edges, { + const attempt = createAttempt([], jobs, edges, { dataclip_id: 's1', }); diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index 550b1ac99..8700fbeeb 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -13,7 +13,7 @@ test.before(async () => { lightning = initLightning(lightningPort); ({ worker } = await initWorker(lightningPort, { - repoDir: path.resolve('tmp/openfn/repo/exit-reason'), + repoDir: path.resolve('tmp/repo/exit-reason'), })); }); diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index a776dd230..272b6692a 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -16,7 +16,7 @@ test.before(async () => { ({ worker, engine } = await initWorker(lightningPort, { maxWorkers: 1, purge: false, - repoDir: path.resolve('tmp/openfn/repo/integration'), + repoDir: path.resolve('tmp/repo/integration'), })); }); From 4ec212692447b46459f6bb3b8aa62a436f6c6644 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:35:32 +0000 Subject: [PATCH 17/22] runtime: return next for trigger nodes, and add tests --- packages/runtime/src/execute/job.ts | 3 + packages/runtime/test/execute/job.test.ts | 82 +++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/packages/runtime/src/execute/job.ts b/packages/runtime/src/execute/job.ts index a39f14717..cb58b9594 100644 --- a/packages/runtime/src/execute/job.ts +++ b/packages/runtime/src/execute/job.ts @@ -166,6 +166,9 @@ const executeJob = async ( next, }); } + } else { + // calculate next for trigger nodes + next = calculateNext(job, result); } return { next, state: result }; diff --git a/packages/runtime/test/execute/job.test.ts b/packages/runtime/test/execute/job.test.ts index 0522e19bc..23c45dc8e 100644 --- a/packages/runtime/test/execute/job.test.ts +++ b/packages/runtime/test/execute/job.test.ts @@ -31,6 +31,52 @@ test.afterEach(() => { logger._reset(); }); +test('resolve and return next for a simple job', async (t) => { + const job = { + id: 'j', + expression: [(s: State) => s], + next: { k: true, a: false }, + }; + const initialState = createState(); + const context = createContext(); + const { next, state } = await execute(context, job, initialState); + + t.deepEqual(state, { data: {} }); + t.deepEqual(next, ['k']); +}); + +test('resolve and return next for a trigger-style job', async (t) => { + const job = { + id: 'j', + next: { k: true, a: false }, + }; + const initialState = createState(); + const context = createContext(); + const { next, state } = await execute(context, job, initialState); + + t.deepEqual(state, initialState); + t.deepEqual(next, ['k']); +}); + +test('resolve and return next for a failed job', async (t) => { + const job = { + id: 'j', + expression: [ + () => { + throw 'e'; + }, + ], + next: { k: true, a: false }, + }; + const initialState = createState(); + const context = createContext(); + const { next, state } = await execute(context, job, initialState); + + // Config should still be scrubbed from data + t.deepEqual(state, { data: {} }); + t.deepEqual(next, ['k']); +}); + test(`notify ${NOTIFY_JOB_START}`, async (t) => { const job = { id: 'j', @@ -49,6 +95,24 @@ test(`notify ${NOTIFY_JOB_START}`, async (t) => { await execute(context, job, state); }); +test(`don't notify ${NOTIFY_JOB_START} for trigger-style jobs`, async (t) => { + const job = { + id: 'j', + }; + const state = createState(); + + const notify = (event: string, payload?: any) => { + if (event === NOTIFY_JOB_START) { + t.fail('should not notify job-start for trigger nodes'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); + t.pass('all ok'); +}); + test(`notify ${NOTIFY_JOB_COMPLETE} with no next`, async (t) => { const job = { id: 'j', @@ -100,6 +164,24 @@ test(`notify ${NOTIFY_JOB_COMPLETE} with two nexts`, async (t) => { await execute(context, job, state); }); +test(`don't notify ${NOTIFY_JOB_COMPLETE} for trigger-style jobs`, async (t) => { + const job = { + id: 'j', + }; + const state = createState(); + + const notify = (event: string) => { + if (event === NOTIFY_JOB_COMPLETE) { + t.fail('should not notify job-start for trigger nodes'); + } + }; + + const context = createContext({ notify }); + + await execute(context, job, state); + t.pass('all ok'); +}); + test(`notify ${NOTIFY_JOB_COMPLETE} should publish serializable state`, async (t) => { // Promises will trigger an exception if you try to serialize them // If we don't return finalState in execute/expression, this test will fail From 0fb2d580f45243c685f36287f90f990539f20f60 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:36:34 +0000 Subject: [PATCH 18/22] changeset --- .changeset/fluffy-scissors-battle.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fluffy-scissors-battle.md diff --git a/.changeset/fluffy-scissors-battle.md b/.changeset/fluffy-scissors-battle.md new file mode 100644 index 000000000..3cebba8fe --- /dev/null +++ b/.changeset/fluffy-scissors-battle.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +correctly handle input_dataclip_id for runs From 85876003ed85b2d1c54b0735a16346b56fdcea7d Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:39:20 +0000 Subject: [PATCH 19/22] runtime: make timed tests more lenient --- packages/runtime/test/execute/job.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/runtime/test/execute/job.test.ts b/packages/runtime/test/execute/job.test.ts index 23c45dc8e..f211eda67 100644 --- a/packages/runtime/test/execute/job.test.ts +++ b/packages/runtime/test/execute/job.test.ts @@ -128,7 +128,7 @@ test(`notify ${NOTIFY_JOB_COMPLETE} with no next`, async (t) => { t.deepEqual(state, state); t.deepEqual(next, []); t.assert(!isNaN(duration)); - t.true(duration < 10); + t.true(duration < 100); t.is(jobId, 'j'); } }; @@ -154,7 +154,7 @@ test(`notify ${NOTIFY_JOB_COMPLETE} with two nexts`, async (t) => { t.deepEqual(state, state); t.deepEqual(next, ['b', 'c']); t.assert(!isNaN(duration)); - t.true(duration < 10); + t.true(duration < 100); t.is(jobId, 'j'); } }; @@ -230,7 +230,7 @@ test(`notify ${NOTIFY_JOB_ERROR} for a fail`, async (t) => { t.deepEqual(state, state); t.deepEqual(next, ['b']); t.assert(!isNaN(duration)); - t.true(duration < 10); + t.true(duration < 100); t.is(jobId, 'j'); } }; From 2f930c03c31f50eee125a03e2764fe946e0a20c1 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:46:30 +0000 Subject: [PATCH 20/22] worker: typings --- packages/ws-worker/src/api/reasons.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/ws-worker/src/api/reasons.ts b/packages/ws-worker/src/api/reasons.ts index 72b9b4788..3c259d5a8 100644 --- a/packages/ws-worker/src/api/reasons.ts +++ b/packages/ws-worker/src/api/reasons.ts @@ -1,5 +1,9 @@ -import { AttemptState } from './execute'; -import type { ExitReason, ExitReasonStrings, State } from '../types'; +import type { + ExitReason, + ExitReasonStrings, + State, + AttemptState, +} from '../types'; import type { JobNode } from '@openfn/runtime'; @@ -43,11 +47,11 @@ const calculateAttemptExitReason = (state: AttemptState) => { // basically becomes the exit reason // So If we get here, we basically just need to look to see if there's a fail on a leaf node // (we ignore fails on non-leaf nodes) - const leafJobReasons = state.plan.jobs - .filter((job) => isLeafNode(state, job)) + const leafJobReasons: ExitReason[] = state.plan.jobs + .filter((job: JobNode) => isLeafNode(state, job)) // TODO what if somehow there is no exit reason for a job? // This implies some kind of exception error, no? - .map(({ id }) => state.reasons[id!]); + .map(({ id }: JobNode) => state.reasons[id!]); const fail = leafJobReasons.find((r) => r && r.reason === 'fail'); if (fail) { From c1717167893086e7fe22957bfccf739849d00f27 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:57:20 +0000 Subject: [PATCH 21/22] worker: fix test --- packages/ws-worker/test/api/execute.test.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index aa968f616..1bf98c1b1 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -66,7 +66,7 @@ test('send event should throw if the event errors', async (t) => { }); test('jobStart should set a run id and active job on state', async (t) => { - const plan = { id: 'attempt-1' }; + const plan = { id: 'attempt-1', jobs: [{ id: 'job-1' }] }; const jobId = 'job-1'; const state = createAttemptState(plan); @@ -85,7 +85,10 @@ test('jobStart should send a run:start event', async (t) => { const plan = { id: 'attempt-1', initialState: 'abc', - jobs: [{ id: 'job-1' }, { id: 'job-2' }], + jobs: [ + { id: 'job-1', expression: '.' }, + { id: 'job-2', expression: '.' }, + ], }; const jobId = 'job-1'; From ea41bd7ba2f42f50a8f0d27c785438318a98fd06 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 15 Nov 2023 12:58:39 +0000 Subject: [PATCH 22/22] version bumps: worker@0.2.6. cli@0.4.7 --- .changeset/chilly-gorillas-poke.md | 5 ----- .changeset/fluffy-scissors-battle.md | 5 ----- .changeset/shaggy-cups-reply.md | 5 ----- integration-tests/worker/CHANGELOG.md | 10 ++++++++++ integration-tests/worker/package.json | 2 +- packages/cli/CHANGELOG.md | 7 +++++++ packages/cli/package.json | 2 +- packages/engine-multi/CHANGELOG.md | 8 ++++++++ packages/engine-multi/package.json | 2 +- packages/lightning-mock/CHANGELOG.md | 9 +++++++++ packages/lightning-mock/package.json | 2 +- packages/runtime/CHANGELOG.md | 6 ++++++ packages/runtime/package.json | 2 +- packages/ws-worker/CHANGELOG.md | 10 ++++++++++ packages/ws-worker/package.json | 2 +- 15 files changed, 56 insertions(+), 21 deletions(-) delete mode 100644 .changeset/chilly-gorillas-poke.md delete mode 100644 .changeset/fluffy-scissors-battle.md delete mode 100644 .changeset/shaggy-cups-reply.md diff --git a/.changeset/chilly-gorillas-poke.md b/.changeset/chilly-gorillas-poke.md deleted file mode 100644 index 91839e6e2..000000000 --- a/.changeset/chilly-gorillas-poke.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/engine-multi': patch ---- - -Forward next from job complete diff --git a/.changeset/fluffy-scissors-battle.md b/.changeset/fluffy-scissors-battle.md deleted file mode 100644 index 3cebba8fe..000000000 --- a/.changeset/fluffy-scissors-battle.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/ws-worker': patch ---- - -correctly handle input_dataclip_id for runs diff --git a/.changeset/shaggy-cups-reply.md b/.changeset/shaggy-cups-reply.md deleted file mode 100644 index 9d744371e..000000000 --- a/.changeset/shaggy-cups-reply.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@openfn/runtime': patch ---- - -Broadcast next steps with job-complete and error events diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 85ee5b45a..7e7c5cab4 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/integration-tests-worker +## 1.0.16 + +### Patch Changes + +- Updated dependencies [c8e9d51] +- Updated dependencies [0fb2d58] + - @openfn/engine-multi@0.1.10 + - @openfn/ws-worker@0.2.6 + - @openfn/lightning-mock@1.0.11 + ## 1.0.15 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index d31529a29..e3d8a8924 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.15", + "version": "1.0.16", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index dd0a9a335..99e781d65 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/cli +## 0.4.7 + +### Patch Changes + +- Updated dependencies [7f352d2] + - @openfn/runtime@0.1.3 + ## 0.4.6 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index eec9e2c89..ded481688 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "0.4.6", + "version": "0.4.7", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 079a56127..0b4b88b69 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,13 @@ # engine-multi +## 0.1.10 + +### Patch Changes + +- c8e9d51: Forward next from job complete +- Updated dependencies [7f352d2] + - @openfn/runtime@0.1.3 + ## 0.1.9 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 6075d9c4f..346c4b164 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "0.1.9", + "version": "0.1.10", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index fde5adf79..b11a1ef37 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/lightning-mock +## 1.0.11 + +### Patch Changes + +- Updated dependencies [c8e9d51] +- Updated dependencies [7f352d2] + - @openfn/engine-multi@0.1.10 + - @openfn/runtime@0.1.3 + ## 1.0.10 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 0f5209034..37563d355 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "1.0.10", + "version": "1.0.11", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 48ee04e8d..cc9021aa0 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 0.1.3 + +### Patch Changes + +- 7f352d2: Broadcast next steps with job-complete and error events + ## 0.1.2 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index bcee8acb4..b867b4ae9 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "0.1.2", + "version": "0.1.3", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 94f29b069..6d58c4d2b 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,15 @@ # ws-worker +## 0.2.6 + +### Patch Changes + +- 0fb2d58: correctly handle input_dataclip_id for runs +- Updated dependencies [c8e9d51] +- Updated dependencies [7f352d2] + - @openfn/engine-multi@0.1.10 + - @openfn/runtime@0.1.3 + ## 0.2.5 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index f6d47b3b3..1bd0994e9 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "0.2.5", + "version": "0.2.6", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module",