Skip to content

Commit

Permalink
worker: typings
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Feb 7, 2024
1 parent 8176558 commit 6b5583b
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 134 deletions.
20 changes: 12 additions & 8 deletions packages/lexicon/core.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { SanitizePolicies } from '@openfn/logger';

/**
* An execution plan is a portable definition of a Work Order,
* or, a unit of work to execute
Expand Down Expand Up @@ -76,6 +78,9 @@ export type WorkflowOptions = {
timeout?: number;
stepTimeout?: number;
start?: StepId;

// TODO not supported yet I don't think?
sanitize?: SanitizePolicies;
};

export type StepId = string;
Expand All @@ -96,14 +101,13 @@ export interface Step {
* Not actually keen on the node/edge semantics here
* Maybe StepLink?
*/
export type StepEdge =
| boolean
| string
| {
condition?: string; // Javascript expression (function body, not function)
label?: string;
disabled?: boolean;
};
export type StepEdge = boolean | string | ConditionalStepEdge;

export type ConditionalStepEdge = {
condition?: string; // Javascript expression (function body, not function)
label?: string;
disabled?: boolean;
};

/**
* A no-op type of Step
Expand Down
30 changes: 25 additions & 5 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import type { SanitizePolicies } from '@openfn/logger';

type StepId = string;

/**
* Type definitions for Lightning and Worker interfaces
*
Expand All @@ -9,25 +13,40 @@
// An run object returned by Lightning
export type Run = {
id: string;
name?: string;
dataclip_id: string;
starting_node_id: string;

triggers: Node[];
jobs: Node[];
edges: Edge[];

options?: Record<string, any>; // TODO type the expected options
options?: RunOptions;
};

/**
* These are options that can be sent to the worker with an execution plan
* They broadly map to the Workflow Options that are fed straight into the runtime
* and saved to the plan itself
* (although at the time of writing timeout is handled by the worker, not the runtime)
*/
export type RunOptions = {
runTimeoutMs?: number;
sanitize?: SanitizePolicies;
start?: StepId;
};

// TODO rename to step
// maybe also split into jobs and triggers
export type Node = {
id: string;
name?: string;
body?: string;
adaptor?: string;
credential?: any; // TODO tighten this up, string or object
credential?: any;
credential_id?: string;
type?: 'webhook' | 'cron'; // trigger only
state?: any; // Initial state / defaults
state?: State;
};

export interface Edge {
Expand All @@ -39,11 +58,12 @@ export interface Edge {
condition?: string;
error_path?: boolean;
errors?: any;
enabled?: boolean;
}

export type DataClip = object;
export type DataClip = Record<string, any>;

export type Credential = object;
export type Credential = Record<string, any>;

export type ExitReasonStrings =
| 'success'
Expand Down
3 changes: 3 additions & 0 deletions packages/lexicon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@
"types": "./lightning.d/.ts"
}
}
},
"devDependencies": {
"@openfn/logger": "workspace:^"
}
}
19 changes: 10 additions & 9 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ExecutionPlan } from '@openfn/lexicon';
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
RunLogPayload,
RunStartPayload,
Expand Down Expand Up @@ -62,12 +62,13 @@ export function execute(
engine: RuntimeEngine,
logger: Logger,
plan: ExecutionPlan,
input: Lazy<State>,
options: RunOptions = {},
onFinish = (_result: any) => {}
) {
logger.info('executing ', plan.id);

const state = createRunState(plan, options);
const state = createRunState(plan, input);

const context: Context = { channel, state, logger, engine, onFinish };

Expand Down Expand Up @@ -135,18 +136,18 @@ export function execute(
// TODO we need to remove this from here and let the runtime take care of it through
// the resolver. See https://github.com/OpenFn/kit/issues/403
// TODO come back and work out how initial state will work
if (typeof plan.initialState === 'string') {
logger.debug('loading dataclip', plan.initialState);
plan.initialState = await loadDataclip(channel, plan.initialState);
if (typeof input === 'string') {
logger.debug('loading dataclip', input);
const loadedInput = await loadDataclip(channel, input);
logger.success('dataclip loaded');
logger.debug(plan.initialState);
return loadedInput;
}
return plan;
return input;
})
// Execute (which we have to wrap in a promise chain to handle initial state)
.then(() => {
.then((input: State) => {
try {
engine.execute(plan, { resolvers, ...options });
engine.execute(plan, input, { resolvers, ...options });
} catch (e: any) {
// TODO what if there's an error?
handleRunError(context, {
Expand Down
19 changes: 7 additions & 12 deletions packages/ws-worker/src/api/reasons.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import type {
ExitReason,
ExitReasonStrings,
State,
RunState,
} from '../types';

import type { JobNode } from '@openfn/runtime';
import { State, Step } from '@openfn/lexicon';
import { ExitReason, ExitReasonStrings } from '@openfn/lexicon/lightning';
import type { RunState } from '../types';

// This takes the result state and error from the job
const calculateJobExitReason = (
Expand All @@ -30,7 +25,7 @@ const calculateJobExitReason = (
};

// It has next jobs, but they weren't executed
const isLeafNode = (state: RunState, job: JobNode) => {
const isLeafNode = (state: RunState, job: Step) => {
// A node is a leaf if:
// It has no `next` jobs at all
if (!job.next || Object.keys(job.next).length == 0) {
Expand All @@ -47,11 +42,11 @@ const calculateRunExitReason = (state: RunState): ExitReason => {
// basically becomes the exit reason
// So If we get here, we basically just need to look to see if there's a fail on a leaf node
// (we ignore fails on non-leaf nodes)
const leafJobReasons: ExitReason[] = state.plan.jobs
.filter((job: JobNode) => isLeafNode(state, job))
const leafJobReasons: ExitReason[] = state.plan.workflow.steps
.filter((job) => isLeafNode(state, job))
// TODO what if somehow there is no exit reason for a job?
// This implies some kind of exception error, no?
.map(({ id }: JobNode) => state.reasons[id!]);
.map(({ id }) => state.reasons[id!]);

const fail = leafJobReasons.find((r) => r && r.reason === 'fail');
if (fail) {
Expand Down
18 changes: 10 additions & 8 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import convertRun from '../util/convert-run';
import { getWithReply } from '../util';
import { Run, RunOptions, Channel, Socket } from '../types';
import { ExecutionPlan } from '@openfn/runtime';
import { GET_PLAN, GetPlanReply } from '../events';

import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type { GetPlanReply, Run, RunOptions } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';

import { getWithReply } from '../util';
import convertRun from '../util/convert-run';
import { GET_PLAN } from '../events';
import type { Channel, Socket } from '../types';

// TODO what happens if this channel join fails?
// Lightning could vanish, channel could error on its side, or auth could be wrong
// We don't have a good feedback mechanism yet - worker:queue is the only channel
Expand All @@ -21,6 +22,7 @@ const joinRunChannel = (
channel: Channel;
plan: ExecutionPlan;
options: RunOptions;
input: Lazy<State>;
}>((resolve, reject) => {
// TMP - lightning seems to be sending two responses to me
// just for now, I'm gonna gate the handling here
Expand All @@ -36,9 +38,9 @@ const joinRunChannel = (
if (!didReceiveOk) {
didReceiveOk = true;
logger.success(`connected to ${channelName}`, e);
const { plan, options } = await loadRun(channel);
const { plan, options, input } = await loadRun(channel);
logger.debug('converted run as execution plan:', plan);
resolve({ channel, plan, options });
resolve({ channel, plan, options, input });
}
})
.receive('error', (err: any) => {
Expand Down
3 changes: 2 additions & 1 deletion packages/ws-worker/src/events/run-complete.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { WorkflowCompletePayload } from '@openfn/engine-multi';
import type { RunCompletePayload } from '@openfn/lexicon/lightning';

import { RUN_COMPLETE, RunCompletePayload } from '../events';
import { RUN_COMPLETE } from '../events';
import { calculateRunExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';
import logFinalReason from '../util/log-final-reason';
Expand Down
6 changes: 3 additions & 3 deletions packages/ws-worker/src/events/run-error.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { calculateJobExitReason } from '../api/reasons';

import type { RunCompletePayload } from '@openfn/lexicon/lightning';
import type { WorkflowErrorPayload } from '@openfn/engine-multi';

import { RUN_COMPLETE, RunCompletePayload } from '../events';
import { calculateJobExitReason } from '../api/reasons';
import { RUN_COMPLETE } from '../events';
import { sendEvent, Context, onJobError } from '../api/execute';
import logFinalReason from '../util/log-final-reason';

Expand Down
6 changes: 3 additions & 3 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import crypto from 'node:crypto';
import type { StepCompletePayload } from '@openfn/lexicon/lightning';
import type { JobCompletePayload } from '@openfn/engine-multi';

import { STEP_COMPLETE, StepCompletePayload } from '../events';
import { STEP_COMPLETE } from '../events';
import { stringify } from '../util';
import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';

import type { JobCompletePayload } from '@openfn/engine-multi';

export default function onStepComplete(
{ channel, state }: Context,
event: JobCompletePayload,
Expand Down
10 changes: 7 additions & 3 deletions packages/ws-worker/src/events/step-start.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import crypto from 'node:crypto';
import { JobStartPayload } from '@openfn/engine-multi';
import { timestamp } from '@openfn/logger';
import { JobStartPayload } from '@openfn/engine-multi';
import type { Job } from '@openfn/lexicon';
import type { StepStartPayload } from '@openfn/lexicon/lightning';

import pkg from '../../package.json' assert { type: 'json' };
import { STEP_START, StepStartPayload } from '../events';
import { STEP_START } from '../events';
import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';

Expand All @@ -20,7 +22,9 @@ export default async function onStepStart(
state.activeStep = crypto.randomUUID();
state.activeJob = event.jobId;

const job = state.plan.jobs.find(({ id }) => id === event.jobId);
const job = state.plan.workflow.steps.find(
({ id }) => id === event.jobId
) as Job;

const input_dataclip_id = state.inputDataclips[event.jobId];

Expand Down
3 changes: 2 additions & 1 deletion packages/ws-worker/src/mock/resolvers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { State, Credential } from '../types';
import type { State } from '@openfn/lexicon';
import type { Credential } from '@openfn/lexicon/lightning';
import { Resolvers } from '@openfn/engine-multi';

const mockResolveCredential = (_credId: string) =>
Expand Down
9 changes: 6 additions & 3 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { EventEmitter } from 'node:events';
import crypto from 'node:crypto';
import run, { ExecutionPlan } from '@openfn/runtime';
import run from '@openfn/runtime';
import * as engine from '@openfn/engine-multi';
import type { ExecutionPlan, Job } from '@openfn/lexicon';

import mockResolvers from './resolvers';

Expand Down Expand Up @@ -79,12 +80,14 @@ async function createMock() {
resolvers: mockResolvers,
}
) => {
const { id, jobs } = xplan;
const { id } = xplan;
const { steps } = xplan.workflow;
activeWorkflows[id!] = true;

const threadId = crypto.randomUUID();

for (const job of jobs) {
for (const step of steps) {
const job = step as Job;
if (typeof job.configuration === 'string') {
// Call the crendtial callback, but don't do anything with it
job.configuration = await options.resolvers?.credential?.(
Expand Down
6 changes: 4 additions & 2 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import koaLogger from 'koa-logger';
import Router from '@koa/router';
import { humanId } from 'human-id';
import { createMockLogger, Logger } from '@openfn/logger';

import { INTERNAL_RUN_COMPLETE, ClaimRun } from './events';
import { ClaimRun } from '@openfn/lexicon/lightning';
import { INTERNAL_RUN_COMPLETE } from './events';
import destroy from './api/destroy';
import startWorkloop from './api/workloop';
import claim from './api/claim';
Expand Down Expand Up @@ -162,6 +162,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
channel: runChannel,
plan,
options,
input,
} = await joinRunChannel(app.socket, token, id, logger);

// Callback to be triggered when the work is done (including errors)
Expand All @@ -176,6 +177,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
engine,
logger,
plan,
input,
options,
onFinish
);
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-worker/src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SanitizePolicies } from '@openfn/logger';
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type { Channel as PhxChannel } from 'phoenix';
import type { ExecutionPlan } from '@openfn/runtime';

export { Socket };

Expand All @@ -9,7 +9,7 @@ export type RunState = {
activeStep?: string;
activeJob?: string;
plan: ExecutionPlan;
options: RunOptions;
input: Lazy<State>;
dataclips: Record<string, any>;
// For each run, map the input ids
// TODO better name maybe?
Expand Down
Loading

0 comments on commit 6b5583b

Please sign in to comment.