From 0a24f04c88a6f6c0bdce458f1acaeacf9eb7b762 Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Fri, 26 May 2023 10:06:38 +0200 Subject: [PATCH] Ability to define target queue per task declaration --- CHANGELOG.md | 6 ++++++ package.json | 2 +- src/bus.ts | 8 +++++++- src/definitions.ts | 5 +++++ src/index.ts | 1 + tests/bus.test.ts | 21 +++++++++++++++++++++ 6 files changed, 41 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d98518b..0c66b75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # pg-tbus +## 0.1.6 + +### Patch Changes + +- Ability to define target queue per task declaration + ## 0.1.5 ### Patch Changes diff --git a/package.json b/package.json index 1f7e61c..4732a53 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "pg-tbus", "author": "IlijaNL", - "version": "0.1.5", + "version": "0.1.6", "types": "dist/index.d.ts", "module": "dist/index.mjs", "main": "dist/index.js", diff --git a/src/bus.ts b/src/bus.ts index e6864c2..f811f94 100644 --- a/src/bus.ts +++ b/src/bus.ts @@ -122,7 +122,7 @@ export type Bus = { /** * Create a pg-tbus instance. - * `serviceName` should be unique for the service. + * `serviceName` should be unique for the service and is used as the queue name. * Using the same `serviceName` with multiple instance will distribute work across instances. */ export const createTBus = (serviceName: string, configuration: TBusConfiguration): Bus => { @@ -238,6 +238,12 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration throw new Error(`task ${definition.task_name} already registered`); } + if (definition.queue && definition.queue !== serviceName) { + throw new Error( + `task ${definition.task_name} belongs to a different queue. Expected ${serviceName}, got ${definition.queue}` + ); + } + taskHandlersMap.set(definition.task_name, { config: definition.config, handler: definition.handler, diff --git a/src/definitions.ts b/src/definitions.ts index b27742d..7e60949 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -94,6 +94,11 @@ export interface DeclareTaskProps { * Task name */ task_name: string; + /** + * Queue this task belongs to. + * If not specified, the queue will be set to service when task is send from a tbus instance. + */ + queue?: string; /** * Task payload schema */ diff --git a/src/index.ts b/src/index.ts index 9dfa7f8..3c5779b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,3 +5,4 @@ export { Type } from '@sinclair/typebox'; export * from './bus'; export * from './sql'; export * from './definitions'; +export type { InsertTask, TaskDTO } from './messages'; diff --git a/tests/bus.test.ts b/tests/bus.test.ts index a60c3cc..d115cab 100644 --- a/tests/bus.test.ts +++ b/tests/bus.test.ts @@ -129,6 +129,27 @@ tap.test('bus', async (tap) => { ); }); + tap.test('throws when task with different queue is registered', async (t) => { + const bus = createTBus('svc', { db: sqlPool, schema: schema }); + + const validTask = defineTask({ + task_name: 'task_abc', + queue: 'svc', + schema: Type.Object({ works: Type.String() }), + handler: async () => {}, + }); + + const throwTask = defineTask({ + task_name: 'task_abc', + queue: 'queuea', + schema: Type.Object({ works: Type.String() }), + handler: async () => {}, + }); + + t.doesNotThrow(() => bus.registerTask(validTask)); + t.throws(() => bus.registerTask(throwTask)); + }); + tap.test('throws when same task is registered', async ({ throws }) => { const bus = createTBus('svc', { db: sqlPool, schema: schema }); const taskDef1 = defineTask({