Skip to content

Commit

Permalink
wip: register concurrency opts
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 3, 2024
1 parent ca7c355 commit cd062f6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
25 changes: 22 additions & 3 deletions typescript-sdk/examples/concurrency/concurrency-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@ import Hatchet from '../../src/sdk';

const hatchet = Hatchet.init();

hatchet.event.push('concurrency:create', {
test: 'test',
});
const sleep = (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});

async function main() {
hatchet.event.push('concurrency:created', {
data: 'event 1',
});

// step 1 will wait 5000 ms,
// so sending a second event
// before that will cancel
// the first run and run the second event
await sleep(1000);

hatchet.event.push('concurrency:create', {
data: 'event 2',
});
}

main();
7 changes: 4 additions & 3 deletions typescript-sdk/examples/concurrency/concurrency-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const workflow: Workflow = {
id: 'concurrency-example',
description: 'test',
on: {
event: 'concurrency:create',
event: 'concurrency:created',
},
concurrency: {
key: (ctx) => ctx.workflowInput().userId,
Expand All @@ -21,10 +21,11 @@ const workflow: Workflow = {
{
name: 'step1',
run: async (ctx) => {
console.log('starting step1 and waiting 5 seconds...');
const { data } = ctx.workflowInput();
console.log('starting step1 and waiting 5 seconds...', data);
await sleep(5000);
console.log('executed step1!');
return { step1: 'step1 results!' };
return { step1: `step1 results for ${data}!` };
},
},
{
Expand Down
27 changes: 24 additions & 3 deletions typescript-sdk/src/clients/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import {
} from '@hatchet/protoc/dispatcher';
import HatchetPromise from '@util/hatchet-promise/hatchet-promise';
import { Workflow } from '@hatchet/workflow';
import { CreateWorkflowStepOpts, WorkflowConcurrencyOpts } from '@hatchet/protoc/workflows';
import {
ConcurrencyLimitStrategy,
CreateWorkflowStepOpts,
WorkflowConcurrencyOpts,
} from '@hatchet/protoc/workflows';
import { Logger } from '@hatchet/util/logger';
import sleep from '@hatchet/util/sleep';
import { Context } from '../../step';
Expand All @@ -25,6 +29,7 @@ export class Worker {
handle_kill: boolean;

action_registry: ActionRegistry;
concurrency_action_registry: ActionRegistry;
listener: ActionListener | undefined;
futures: Record<Action['stepRunId'], HatchetPromise<any>> = {};
contexts: Record<Action['stepRunId'], Context<any>> = {};
Expand All @@ -35,6 +40,7 @@ export class Worker {
this.client = client;
this.name = options.name;
this.action_registry = {};
this.concurrency_action_registry = {};

process.on('SIGTERM', () => this.exitGracefully());
process.on('SIGINT', () => this.exitGracefully());
Expand All @@ -47,7 +53,14 @@ export class Worker {

async registerWorkflow(workflow: Workflow) {
try {
const concurrency: WorkflowConcurrencyOpts | undefined = undefined;
const concurrency: WorkflowConcurrencyOpts | undefined = workflow.concurrency?.action
? {
action: `${this.serviceName}:${workflow.concurrency.action}`,
maxRuns: workflow.concurrency.maxRuns || 1,
limitStrategy:
workflow.concurrency.limitStrategy || ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
}
: undefined;

await this.client.admin.put_workflow({
name: workflow.id,
Expand Down Expand Up @@ -80,6 +93,12 @@ export class Worker {
acc[`${this.serviceName}:${step.name}`] = step.run;
return acc;
}, {});

this.concurrency_action_registry = workflow.concurrency?.action
? {
[`${this.serviceName}:${workflow.concurrency.action}`]: workflow.concurrency.key,
}
: {};
}

handleStartStepRun(action: Action) {
Expand Down Expand Up @@ -308,7 +327,9 @@ export class Worker {
this.logger.info(`Worker ${this.name} listening for actions`);

for await (const action of generator) {
this.logger.info(`Worker ${this.name} received action ${action.actionId}`);
this.logger.info(
`Worker ${this.name} received action ${action.actionId}:${action.actionType}`
);

if (action.actionType === ActionType.START_STEP_RUN) {
this.handleStartStepRun(action);
Expand Down
2 changes: 1 addition & 1 deletion typescript-sdk/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const CreateWorkflowSchema = z.object({

export interface Workflow extends z.infer<typeof CreateWorkflowSchema> {
concurrency?: z.infer<typeof WorkflowConcurrency> & {
key: string | ((ctx: any) => string);
key: (ctx: any) => string;
};
steps: CreateStep<any>[];
}

0 comments on commit cd062f6

Please sign in to comment.