Skip to content

Commit

Permalink
Ability to define target queue per task declaration
Browse files Browse the repository at this point in the history
  • Loading branch information
ilijaNL committed May 26, 2023
1 parent 50814fd commit 0a24f04
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 2 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# pg-tbus

## 0.1.6

### Patch Changes

- Ability to define target queue per task declaration

## 0.1.5

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pg-tbus",
"author": "IlijaNL",
"version": "0.1.5",
"version": "0.1.6",
"types": "dist/index.d.ts",
"module": "dist/index.mjs",
"main": "dist/index.js",
Expand Down
8 changes: 7 additions & 1 deletion src/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export type Bus = {

/**
* Create a pg-tbus instance.
* `serviceName` should be unique for the service.
* `serviceName` should be unique for the service and is used as the queue name.
* Using the same `serviceName` with multiple instance will distribute work across instances.
*/
export const createTBus = (serviceName: string, configuration: TBusConfiguration): Bus => {
Expand Down Expand Up @@ -238,6 +238,12 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration
throw new Error(`task ${definition.task_name} already registered`);
}

if (definition.queue && definition.queue !== serviceName) {
throw new Error(
`task ${definition.task_name} belongs to a different queue. Expected ${serviceName}, got ${definition.queue}`
);
}

taskHandlersMap.set(definition.task_name, {
config: definition.config,
handler: definition.handler,
Expand Down
5 changes: 5 additions & 0 deletions src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ export interface DeclareTaskProps<T extends TSchema> {
* Task name
*/
task_name: string;
/**
* Queue this task belongs to.
* If not specified, the queue will be set to service when task is send from a tbus instance.
*/
queue?: string;
/**
* Task payload schema
*/
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export { Type } from '@sinclair/typebox';
export * from './bus';
export * from './sql';
export * from './definitions';
export type { InsertTask, TaskDTO } from './messages';
21 changes: 21 additions & 0 deletions tests/bus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ tap.test('bus', async (tap) => {
);
});

tap.test('throws when task with different queue is registered', async (t) => {
const bus = createTBus('svc', { db: sqlPool, schema: schema });

const validTask = defineTask({
task_name: 'task_abc',
queue: 'svc',
schema: Type.Object({ works: Type.String() }),
handler: async () => {},
});

const throwTask = defineTask({
task_name: 'task_abc',
queue: 'queuea',
schema: Type.Object({ works: Type.String() }),
handler: async () => {},
});

t.doesNotThrow(() => bus.registerTask(validTask));
t.throws(() => bus.registerTask(throwTask));
});

tap.test('throws when same task is registered', async ({ throws }) => {
const bus = createTBus('svc', { db: sqlPool, schema: schema });
const taskDef1 = defineTask({
Expand Down

0 comments on commit 0a24f04

Please sign in to comment.