Skip to content

Commit

Permalink
Merge pull request #564 from OpenFn/556
Browse files Browse the repository at this point in the history
Rename "runs" to "steps"
  • Loading branch information
taylordowns2000 authored Jan 26, 2024
2 parents ee7b959 + 7d7c2c2 commit c5ace0f
Show file tree
Hide file tree
Showing 32 changed files with 226 additions and 209 deletions.
8 changes: 8 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/integration-tests-worker

## 1.0.32

### Patch Changes

- Updated dependencies [39af8e1]
- @openfn/lightning-mock@1.1.11
- @openfn/ws-worker@0.6.1

## 1.0.31

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.31",
"version": "1.0.32",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
16 changes: 8 additions & 8 deletions integration-tests/worker/test/attempts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);

const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('run:complete', ({ payload }) => {
lightning.on('step:complete', ({ payload }) => {
// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.run_id} done in ${payload.duration / 1000}s [${humanMb(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
payload.mem.job
)} / ${humanMb(payload.mem.system)}mb] [thread ${payload.thread_id}]`
);
Expand Down Expand Up @@ -90,19 +90,19 @@ test.serial('start from a trigger node', async (t) => {
dataclip_id: 's1',
});

lightning.once('run:start', (evt) => {
lightning.once('step:start', (evt) => {
runStartEvent = evt.payload;
});

lightning.once('run:complete', (evt) => {
lightning.once('step:complete', (evt) => {
runCompleteEvent = evt.payload;
});

await run(t, attempt);

t.truthy(runStartEvent);
t.is(runStartEvent.job_id, job.id);
t.truthy(runStartEvent.run_id);
t.truthy(runStartEvent.step_id);
t.is(runStartEvent.input_dataclip_id, 's1');

t.truthy(runCompleteEvent);
Expand Down Expand Up @@ -145,14 +145,14 @@ test.serial('run parallel jobs', async (t) => {
// This saves the dataclip returned by a job
const outputId = {};

lightning.on('run:start', (evt) => {
lightning.on('step:start', (evt) => {
// x and y should both be passed the dataclip produced by job a
if (evt.payload.run_id === x.id || evt.payload.run_id === y.id) {
if (evt.payload.step_id === x.id || evt.payload.step_id === y.id) {
evt.payload.input_dataclip_id = outputId[a.id];
}
});

lightning.on('run:complete', (evt) => {
lightning.on('step:complete', (evt) => {
// save the output dataclip
outputJson[evt.payload.job_id] = evt.payload.output_dataclip_id;
outputJson[evt.payload.job_id] = JSON.parse(evt.payload.output_dataclip);
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/test/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ test.serial.skip('run 100 attempts', async (t) => {
lightning.enqueueAttempt(attempt);
}

lightning.on('run:complete', (evt) => {
lightning.on('step:complete', (evt) => {
// May want to disable this but it's nice feedback
t.log('Completed ', evt.attemptId);

Expand Down
8 changes: 4 additions & 4 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ test('blacklist a non-openfn adaptor', (t) => {
});
});

test('a timeout error should still call run-complete', (t) => {
test('a timeout error should still call step-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand All @@ -346,7 +346,7 @@ test('a timeout error should still call run-complete', (t) => {
t.log('attempt started');
});

lightning.once('run:complete', (event) => {
lightning.once('step:complete', (event) => {
t.is(event.payload.reason, 'kill');
t.is(event.payload.error_type, 'TimeoutError');
});
Expand All @@ -359,7 +359,7 @@ test('a timeout error should still call run-complete', (t) => {
});
});

test('an OOM error should still call run-complete', (t) => {
test('an OOM error should still call step-complete', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
Expand All @@ -378,7 +378,7 @@ test('an OOM error should still call run-complete', (t) => {
],
};

lightning.once('run:complete', (event) => {
lightning.once('step:complete', (event) => {
t.is(event.payload.reason, 'kill');
});

Expand Down
6 changes: 6 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/lightning-mock

## 1.1.11

### Patch Changes

- 39af8e1: Ensure that we refer to the child of a 'run' (aka attempt) as a 'step'

## 1.1.10

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "1.1.10",
"version": "1.1.11",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const setupDevAPI = (
state.pending[attempt.id] = {
status: 'queued',
logs: [],
runs: {},
steps: {},
};
state.queue.push(attempt.id);
};
Expand Down
38 changes: 19 additions & 19 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import {
GET_ATTEMPT,
GET_CREDENTIAL,
GET_DATACLIP,
RUN_COMPLETE,
RUN_START,
STEP_COMPLETE,
STEP_START,
} from './events';
import { extractAttemptId, stringify } from './util';

Expand All @@ -37,10 +37,10 @@ import type {
GetCredentialReply,
GetDataclipPayload,
GetDataClipReply,
RunCompletePayload,
RunCompleteReply,
RunStartPayload,
RunStartReply,
StepCompletePayload,
StepCompleteReply,
StepStartPayload,
StepStartReply,
} from './types';

// dumb cloning id
Expand Down Expand Up @@ -118,7 +118,7 @@ const createSocketAPI = (
state.pending[attemptId] = {
status: 'started',
logs: [],
runs: {},
steps: {},
};

const wrap = <T>(
Expand All @@ -145,9 +145,9 @@ const createSocketAPI = (
[ATTEMPT_START]: wrap(handleStartAttempt),
[GET_CREDENTIAL]: wrap(getCredential),
[GET_DATACLIP]: wrap(getDataclip),
[RUN_START]: wrap(handleRunStart),
[STEP_START]: wrap(handleStepStart),
[ATTEMPT_LOG]: wrap(handleLog),
[RUN_COMPLETE]: wrap(handleRunComplete),
[STEP_COMPLETE]: wrap(handleStepComplete),
[ATTEMPT_COMPLETE]: wrap((...args) => {
handleAttemptComplete(...args);
unsubscribe();
Expand Down Expand Up @@ -369,28 +369,28 @@ const createSocketAPI = (
});
}

function handleRunStart(
function handleStepStart(
state: ServerState,
ws: DevSocket,
evt: PhoenixEvent<RunStartPayload>
evt: PhoenixEvent<StepStartPayload>
) {
const { ref, join_ref, topic } = evt;
const { run_id, job_id, input_dataclip_id } = evt.payload;
const { step_id, job_id, input_dataclip_id } = evt.payload;

const [_, attemptId] = topic.split(':');
if (!state.dataclips) {
state.dataclips = {};
}
state.pending[attemptId].runs[job_id] = run_id;
state.pending[attemptId].steps[job_id] = step_id;

let payload: any = {
status: 'ok',
};

if (!run_id) {
if (!step_id) {
payload = {
status: 'error',
response: 'no run_id',
response: 'no step_id',
};
} else if (!job_id) {
payload = {
Expand All @@ -404,18 +404,18 @@ const createSocketAPI = (
};
}

ws.reply<RunStartReply>({
ws.reply<StepStartReply>({
ref,
join_ref,
topic,
payload,
});
}

function handleRunComplete(
function handleStepComplete(
state: ServerState,
ws: DevSocket,
evt: PhoenixEvent<RunCompletePayload>
evt: PhoenixEvent<StepCompletePayload>
) {
const { ref, join_ref, topic } = evt;
const { output_dataclip_id, output_dataclip } = evt.payload;
Expand All @@ -440,7 +440,7 @@ const createSocketAPI = (
}

// be polite and acknowledge the event
ws.reply<RunCompleteReply>({
ws.reply<StepCompleteReply>({
ref,
join_ref,
topic,
Expand Down
4 changes: 2 additions & 2 deletions packages/lightning-mock/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ export const GET_DATACLIP = 'fetch:dataclip';
export const ATTEMPT_START = 'attempt:start';
export const ATTEMPT_COMPLETE = 'attempt:complete';
export const ATTEMPT_LOG = 'attempt:log';
export const RUN_START = 'run:start';
export const RUN_COMPLETE = 'run:complete';
export const STEP_START = 'step:start';
export const STEP_COMPLETE = 'step:complete';
4 changes: 2 additions & 2 deletions packages/lightning-mock/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import createDevAPI from './api-dev';

import type { AttemptLogPayload, Attempt, DevServer } from './types';

type RunId = string;
type StepId = string;
type JobId = string;

export type AttemptState = {
status: 'queued' | 'started' | 'complete';
logs: AttemptLogPayload[];
runs: Record<JobId, RunId>;
steps: Record<JobId, StepId>;
};

export type ServerState = {
Expand Down
14 changes: 7 additions & 7 deletions packages/lightning-mock/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,23 @@ export type AttemptLogPayload = {
level?: string;
source?: string; // namespace
job_id?: string;
run_id?: string;
step_id?: string;
};
export type AttemptLogReply = void;

export type RunStartPayload = {
export type StepStartPayload = {
job_id: string;
run_id: string;
step_id: string;
attempt_id?: string;
input_dataclip_id?: string;
};
export type RunStartReply = void;
export type StepStartReply = void;

export type RunCompletePayload = ExitReason & {
export type StepCompletePayload = ExitReason & {
attempt_id?: string;
job_id: string;
run_id: string;
step_id: string;
output_dataclip?: string;
output_dataclip_id?: string;
};
export type RunCompleteReply = void;
export type StepCompleteReply = void;
2 changes: 1 addition & 1 deletion packages/lightning-mock/test/channels/attempt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ test.serial('complete an attempt through the attempt channel', async (t) => {
t.deepEqual(pending[a.id], {
status: 'complete',
logs: [],
runs: {},
steps: {},
});
t.deepEqual(results[a.id].state, { answer: 42 });
done();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import test from 'ava';
import { RUN_COMPLETE } from '../../src/events';
import { STEP_COMPLETE } from '../../src/events';

import { join, setup, createAttempt } from '../util';

Expand All @@ -24,7 +24,7 @@ test.serial('acknowledge valid message', async (t) => {

const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('ok', (evt) => {
channel.push(STEP_COMPLETE, event).receive('ok', (evt) => {
t.pass('event acknowledged');
done();
});
Expand All @@ -45,7 +45,7 @@ test.serial('save dataclip id to state', async (t) => {

const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('ok', () => {
channel.push(STEP_COMPLETE, event).receive('ok', () => {
t.deepEqual(server.state.dataclips.t, JSON.parse(event.output_dataclip));
done();
});
Expand All @@ -65,7 +65,7 @@ test.serial('error if no reason', async (t) => {
};
const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('error', () => {
channel.push(STEP_COMPLETE, event).receive('error', () => {
t.pass('event rejected');
done();
});
Expand All @@ -88,7 +88,7 @@ test.serial('error if no output dataclip', async (t) => {
};
const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('error', (e) => {
channel.push(STEP_COMPLETE, event).receive('error', (e) => {
t.is(e.toString(), 'no output_dataclip');
done();
});
Expand All @@ -108,7 +108,7 @@ test.serial('error if no output dataclip_id', async (t) => {
};
const channel = await join(client, attempt.id);

channel.push(RUN_COMPLETE, event).receive('error', (e) => {
channel.push(STEP_COMPLETE, event).receive('error', (e) => {
t.is(e.toString(), 'no output_dataclip_id');
done();
});
Expand Down
Loading

0 comments on commit c5ace0f

Please sign in to comment.