Skip to content

Commit

Permalink
fix: remove input from step context
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Jan 27, 2024
1 parent c6ffa83 commit dc03647
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
8 changes: 4 additions & 4 deletions typescript-sdk/examples/dag-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const workflow: Workflow = {
steps: [
{
name: 'dag-step1',
run: async (input, ctx) => {
run: async (ctx) => {
console.log('executed step1!');
await sleep(5000);
return { step1: 'step1' };
Expand All @@ -28,23 +28,23 @@ const workflow: Workflow = {
{
name: 'dag-step2',
parents: ['dag-step1'],
run: (input, ctx) => {
run: (ctx) => {
console.log('executed step2!');
return { step2: 'step2' };
},
},
{
name: 'dag-step3',
parents: ['dag-step1', 'dag-step2'],
run: (input, ctx) => {
run: (ctx) => {
console.log('executed step3!');
return { step3: 'step3' };
},
},
{
name: 'dag-step4',
parents: ['dag-step1', 'dag-step3'],
run: (input, ctx) => {
run: (ctx) => {
console.log('executed step4!');
return { step4: 'step4' };
},
Expand Down
13 changes: 7 additions & 6 deletions typescript-sdk/examples/simple-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ const workflow: Workflow = {
steps: [
{
name: 'step1',
run: async (input, ctx) => {
console.log('starting step1!');
run: async (ctx) => {
console.log('starting step1 with the following input', ctx.workflowInput());
console.log('waiting 5 seconds...');
await sleep(5000);
console.log('executed step1!');
return { step1: 'step1' };
return { step1: 'step1 results!' };
},
},
{
name: 'step2',
parents: ['step1'],
run: (input, ctx) => {
console.log('executed step2!', ctx.workflowInput());
return { step2: 'step2' };
run: (ctx) => {
console.log('executed step2 after step1 returned ', ctx.stepOutput('step1'));
return { step2: 'step2 results!' };
},
},
],
Expand Down
9 changes: 4 additions & 5 deletions typescript-sdk/hatchet/clients/worker/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { never } from 'zod';
import sleep from '@util/sleep';
import { ChannelCredentials } from 'nice-grpc';
import { Worker } from './worker';
import { Context } from '@hatchet/step';

type AssignActionMock = AssignedAction | Error;

Expand All @@ -19,7 +20,7 @@ const mockStart: AssignActionMock = {
stepRunId: 'runStep1',
actionId: 'action1',
actionType: ActionType.START_STEP_RUN,
actionPayload: '{"input": {"data": 1}}',
actionPayload: JSON.stringify('{"input": {"data": 1}}'),
};

const mockCancel: AssignActionMock = {
Expand Down Expand Up @@ -63,8 +64,7 @@ describe('Worker', () => {
steps: [
{
name: 'step1',
run: (input: any, ctx: any) => {
console.log('step1', input, ctx);
run: (ctx: any) => {
return { test: 'test' };
},
},
Expand Down Expand Up @@ -106,7 +106,7 @@ describe('Worker', () => {
await sleep(100);

expect(startSpy).toHaveBeenCalledTimes(1);
expect(startSpy).toHaveBeenCalledWith({ data: 1 }, expect.anything());

expect(getActionEventSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
Expand Down Expand Up @@ -139,7 +139,6 @@ describe('Worker', () => {
await sleep(100);

expect(startSpy).toHaveBeenCalledTimes(1);
expect(startSpy).toHaveBeenCalledWith({ data: 1 }, expect.anything());
expect(getActionEventSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
Expand Down
2 changes: 1 addition & 1 deletion typescript-sdk/hatchet/clients/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class Worker {
}

const run = async () => {
return step(context.workflowInput(), context);
return step(context);
};

const success = (result: any) => {
Expand Down
14 changes: 10 additions & 4 deletions typescript-sdk/hatchet/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ export const CreateStepSchema = z.object({

export type NextStep = { [key: string]: string };

export class Context<T = any> {
data: T | any;
interface ContextData<T = unknown> {
input: T;
parents: Record<string, any>;
triggered_by_event: string;
}

export class Context<T = unknown> {
data: ContextData<T>;
constructor(payload: string) {
try {
this.data = JSON.parse(payload);
this.data = JSON.parse(JSON.parse(payload));
} catch (e: any) {
throw new HatchetError(`Could not parse payload: ${e.message}`);
}
Expand All @@ -38,5 +44,5 @@ export class Context<T = any> {
}

export interface CreateStep<T> extends z.infer<typeof CreateStepSchema> {
run: (input: T, ctx: Context) => Promise<NextStep> | NextStep | void;
run: (ctx: Context) => Promise<NextStep> | NextStep | void;
}

0 comments on commit dc03647

Please sign in to comment.