Skip to content

Commit

Permalink
wip: concurrency
Browse files Browse the repository at this point in the history
grutt committed Feb 2, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent be0a17b commit 5270c63
Showing 18 changed files with 777 additions and 151 deletions.
41 changes: 41 additions & 0 deletions typescript-sdk/examples/concurrency-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Hatchet from '../src/sdk';
import { Workflow } from '../src/workflow';

const hatchet = Hatchet.init();

const sleep = (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});

const workflow: Workflow = {
id: 'example',
description: 'test',
on: {
event: 'user:create',
},
concurrency: {
key: (ctx) => ctx.workflowInput().userId,
},
steps: [
{
name: 'step1',
run: async (ctx) => {
console.log('starting step1 and waiting 5 seconds...');
await sleep(5000);
console.log('executed step1!');
return { step1: 'step1 results!' };
},
},
{
name: 'step2',
parents: ['step1'],
run: (ctx) => {
console.log('executed step2 after step1 returned ', ctx.stepOutput('step1'));
return { step2: 'step2 results!' };
},
},
],
};

hatchet.run(workflow);
4 changes: 2 additions & 2 deletions typescript-sdk/examples/dag-worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Hatchet from '@hatchet/sdk';
import { Workflow } from '@hatchet/workflow';
import Hatchet from '../src/sdk';
import { Workflow } from '../src/workflow';

const hatchet = Hatchet.init({
log_level: 'OFF',
2 changes: 1 addition & 1 deletion typescript-sdk/examples/example-event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Hatchet from '@hatchet/sdk';
import Hatchet from '../src/sdk';

const hatchet = Hatchet.init();

4 changes: 2 additions & 2 deletions typescript-sdk/examples/simple-worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Hatchet from '@hatchet/sdk';
import { Workflow } from '@hatchet/workflow';
import Hatchet from '../src/sdk';
import { Workflow } from '../src/workflow';

const hatchet = Hatchet.init();

3 changes: 2 additions & 1 deletion typescript-sdk/package.json
Original file line number Diff line number Diff line change
@@ -28,7 +28,8 @@
"exec": "npx dotenv -- ts-node -r tsconfig-paths/register --project tsconfig.json",
"example:event": "npm run exec -- ./examples/example-event.ts",
"worker:simple": "npm run exec -- ./examples/simple-worker.ts",
"worker:dag": "npm run exec -- ./examples/dag-worker.ts"
"worker:dag": "npm run exec -- ./examples/dag-worker.ts",
"worker:concurrency": "npm run exec -- ./examples/concurrency-worker.ts"
},
"keywords": [],
"author": "",
4 changes: 3 additions & 1 deletion typescript-sdk/src/clients/admin/admin-client.test.ts
Original file line number Diff line number Diff line change
@@ -51,9 +51,10 @@ describe('AdminClient', () => {
cronTriggers: [],
scheduledTriggers: [],
jobs: [],
concurrency: undefined,
};

expect(() => client.put_workflow(workflow, { autoVersion: false })).rejects.toThrow(
expect(() => client.put_workflow(workflow)).rejects.toThrow(
'PutWorkflow error: workflow version is required, or use autoVersion'
);
});
@@ -67,6 +68,7 @@ describe('AdminClient', () => {
cronTriggers: [],
scheduledTriggers: [],
jobs: [],
concurrency: undefined,
};

const putSpy = jest.spyOn(client.client, 'putWorkflow').mockResolvedValue({
6 changes: 1 addition & 5 deletions typescript-sdk/src/clients/admin/admin-client.ts
Original file line number Diff line number Diff line change
@@ -16,11 +16,7 @@ export class AdminClient {
this.client = factory.create(WorkflowServiceDefinition, channel);
}

async put_workflow(workflow: CreateWorkflowVersionOpts, options?: { autoVersion?: boolean }) {
if (workflow.version === '' && !options?.autoVersion) {
throw new HatchetError('PutWorkflow error: workflow version is required, or use autoVersion');
}

async put_workflow(workflow: CreateWorkflowVersionOpts) {
try {
await this.client.putWorkflow({
opts: workflow,
4 changes: 4 additions & 0 deletions typescript-sdk/src/clients/dispatcher/action-listener.test.ts
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@ const mockAssignedActions: AssignActionMock[] = [
actionId: 'action1',
actionType: ActionType.START_STEP_RUN,
actionPayload: 'payload1',
workflowRunId: 'workflowRun1',
getGroupKeyRunId: 'groupKeyRun1',
},
// ... Add more mock AssignedAction objects as needed
];
@@ -90,6 +92,8 @@ describe('ActionListener', () => {
actionId: 'action1',
actionType: ActionType.START_STEP_RUN,
actionPayload: 'payload1',
workflowRunId: 'workflowRun1',
getGroupKeyRunId: 'groupKeyRun1',
});
});

2 changes: 2 additions & 0 deletions typescript-sdk/src/clients/dispatcher/action-listener.ts
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ export interface Action {
actionId: string;
actionType: number;
actionPayload: string;
workflowRunId: string;
getGroupKeyRunId: string;
}

export class ActionListener {
12 changes: 6 additions & 6 deletions typescript-sdk/src/clients/dispatcher/dispatcher-client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ActionEventType } from '@hatchet/protoc/dispatcher';
import { StepActionEventType } from '@hatchet/protoc/dispatcher';
import { DispatcherClient } from './dispatcher-client';
import { mockChannel, mockFactory } from '../hatchet-client/hatchet-client.test';

@@ -55,7 +55,7 @@ describe('DispatcherClient', () => {

const listenerSpy = jest.spyOn(client.client, 'listen');

const listener = await client.get_action_listener({
const listener = await client.getActionListener({
workerName: 'WORKER_NAME',
services: ['SERVICE'],
actions: ['ACTION'],
@@ -78,15 +78,15 @@ describe('DispatcherClient', () => {

describe('send_action_event', () => {
it('should send action events', () => {
const clientSpy = jest.spyOn(client.client, 'sendActionEvent').mockResolvedValue({
const clientSpy = jest.spyOn(client.client, 'sendStepActionEvent').mockResolvedValue({
tenantId: 'TENANT_ID',
workerId: 'WORKER_ID',
});

client.send_action_event({
client.sendStepActionEvent({
workerId: 'WORKER_ID',
actionId: 'ACTION_ID',
eventType: ActionEventType.STEP_EVENT_TYPE_COMPLETED,
eventType: StepActionEventType.STEP_EVENT_TYPE_COMPLETED,
eventPayload: '{"foo":"bar"}',
eventTimestamp: new Date(),
jobId: 'a',
@@ -98,7 +98,7 @@ describe('DispatcherClient', () => {
expect(clientSpy).toHaveBeenCalledWith({
workerId: 'WORKER_ID',
actionId: 'ACTION_ID',
eventType: ActionEventType.STEP_EVENT_TYPE_COMPLETED,
eventType: StepActionEventType.STEP_EVENT_TYPE_COMPLETED,
eventPayload: '{"foo":"bar"}',
jobId: 'a',
jobRunId: 'b',
17 changes: 13 additions & 4 deletions typescript-sdk/src/clients/dispatcher/dispatcher-client.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@ import { Channel, ClientFactory } from 'nice-grpc';
import {
DispatcherClient as PbDispatcherClient,
DispatcherDefinition,
ActionEvent,
StepActionEvent,
GroupKeyActionEvent,
} from '@hatchet/protoc/dispatcher';
import { ClientConfig } from '@clients/hatchet-client/client-config';
import HatchetError from '@util/errors/hatchet-error';
@@ -23,7 +24,7 @@ export class DispatcherClient {
this.client = factory.create(DispatcherDefinition, channel);
}

async get_action_listener(options: GetActionListenerOptions) {
async getActionListener(options: GetActionListenerOptions) {
// Register the worker
const registration = await this.client.register({
...options,
@@ -37,9 +38,17 @@ export class DispatcherClient {
return new ActionListener(this, listener, registration.workerId);
}

async send_action_event(in_: ActionEvent) {
async sendStepActionEvent(in_: StepActionEvent) {
try {
return this.client.sendActionEvent(in_);
return this.client.sendStepActionEvent(in_);
} catch (e: any) {
throw new HatchetError(e.message);
}
}

async sendGroupKeyActionEvent(in_: GroupKeyActionEvent) {
try {
return this.client.sendGroupKeyActionEvent(in_);
} catch (e: any) {
throw new HatchetError(e.message);
}
Original file line number Diff line number Diff line change
@@ -127,7 +127,7 @@ export class HatchetClient {
});

if (typeof workflow !== 'string') {
await worker.register_workflow(workflow);
await worker.registerWorkflow(workflow);
return worker;
}

Loading

0 comments on commit 5270c63

Please sign in to comment.