diff --git a/typescript-sdk/examples/concurrency/concurrency-event.ts b/typescript-sdk/examples/concurrency/concurrency-event.ts index b95f3b411..cc32423c6 100644 --- a/typescript-sdk/examples/concurrency/concurrency-event.ts +++ b/typescript-sdk/examples/concurrency/concurrency-event.ts @@ -2,6 +2,25 @@ import Hatchet from '../../src/sdk'; const hatchet = Hatchet.init(); -hatchet.event.push('concurrency:create', { - test: 'test', -}); +const sleep = (ms: number) => + new Promise((resolve) => { + setTimeout(resolve, ms); + }); + +async function main() { + hatchet.event.push('concurrency:created', { + data: 'event 1', + }); + + // step 1 will wait 5000 ms, + // so sending a second event + // before that will cancel + // the first run and run the second event + await sleep(1000); + + hatchet.event.push('concurrency:create', { + data: 'event 2', + }); +} + +main(); diff --git a/typescript-sdk/examples/concurrency/concurrency-worker.ts b/typescript-sdk/examples/concurrency/concurrency-worker.ts index 759e76d82..1974d1e1d 100644 --- a/typescript-sdk/examples/concurrency/concurrency-worker.ts +++ b/typescript-sdk/examples/concurrency/concurrency-worker.ts @@ -12,7 +12,7 @@ const workflow: Workflow = { id: 'concurrency-example', description: 'test', on: { - event: 'concurrency:create', + event: 'concurrency:created', }, concurrency: { key: (ctx) => ctx.workflowInput().userId, @@ -21,10 +21,11 @@ const workflow: Workflow = { { name: 'step1', run: async (ctx) => { - console.log('starting step1 and waiting 5 seconds...'); + const { data } = ctx.workflowInput(); + console.log('starting step1 and waiting 5 seconds...', data); await sleep(5000); console.log('executed step1!'); - return { step1: 'step1 results!' }; + return { step1: `step1 results for ${data}!` }; }, }, { diff --git a/typescript-sdk/src/clients/worker/worker.ts b/typescript-sdk/src/clients/worker/worker.ts index f95154d92..39fc0f33d 100644 --- a/typescript-sdk/src/clients/worker/worker.ts +++ b/typescript-sdk/src/clients/worker/worker.ts @@ -10,7 +10,11 @@ import { } from '@hatchet/protoc/dispatcher'; import HatchetPromise from '@util/hatchet-promise/hatchet-promise'; import { Workflow } from '@hatchet/workflow'; -import { CreateWorkflowStepOpts, WorkflowConcurrencyOpts } from '@hatchet/protoc/workflows'; +import { + ConcurrencyLimitStrategy, + CreateWorkflowStepOpts, + WorkflowConcurrencyOpts, +} from '@hatchet/protoc/workflows'; import { Logger } from '@hatchet/util/logger'; import sleep from '@hatchet/util/sleep'; import { Context } from '../../step'; @@ -25,6 +29,7 @@ export class Worker { handle_kill: boolean; action_registry: ActionRegistry; + concurrency_action_registry: ActionRegistry; listener: ActionListener | undefined; futures: Record> = {}; contexts: Record> = {}; @@ -35,6 +40,7 @@ export class Worker { this.client = client; this.name = options.name; this.action_registry = {}; + this.concurrency_action_registry = {}; process.on('SIGTERM', () => this.exitGracefully()); process.on('SIGINT', () => this.exitGracefully()); @@ -47,7 +53,14 @@ export class Worker { async registerWorkflow(workflow: Workflow) { try { - const concurrency: WorkflowConcurrencyOpts | undefined = undefined; + const concurrency: WorkflowConcurrencyOpts | undefined = workflow.concurrency?.action + ? { + action: `${this.serviceName}:${workflow.concurrency.action}`, + maxRuns: workflow.concurrency.maxRuns || 1, + limitStrategy: + workflow.concurrency.limitStrategy || ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS, + } + : undefined; await this.client.admin.put_workflow({ name: workflow.id, @@ -80,6 +93,12 @@ export class Worker { acc[`${this.serviceName}:${step.name}`] = step.run; return acc; }, {}); + + this.concurrency_action_registry = workflow.concurrency?.action + ? { + [`${this.serviceName}:${workflow.concurrency.action}`]: workflow.concurrency.key, + } + : {}; } handleStartStepRun(action: Action) { @@ -308,7 +327,9 @@ export class Worker { this.logger.info(`Worker ${this.name} listening for actions`); for await (const action of generator) { - this.logger.info(`Worker ${this.name} received action ${action.actionId}`); + this.logger.info( + `Worker ${this.name} received action ${action.actionId}:${action.actionType}` + ); if (action.actionType === ActionType.START_STEP_RUN) { this.handleStartStepRun(action); diff --git a/typescript-sdk/src/workflow.ts b/typescript-sdk/src/workflow.ts index 9f6dbdbd2..afcee5aa6 100644 --- a/typescript-sdk/src/workflow.ts +++ b/typescript-sdk/src/workflow.ts @@ -40,7 +40,7 @@ export const CreateWorkflowSchema = z.object({ export interface Workflow extends z.infer { concurrency?: z.infer & { - key: string | ((ctx: any) => string); + key: (ctx: any) => string; }; steps: CreateStep[]; }