Skip to content

Commit

Permalink
feat(@rhtml/amqp): changed a little bit the implementation to extend …
Browse files Browse the repository at this point in the history
…more the api
  • Loading branch information
Stradivario committed Jan 10, 2025
1 parent 91c415d commit af8c0a2
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 12 deletions.
8 changes: 4 additions & 4 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Integrate AMQP with your Fastify controllers for message subscription and publis
import { Controller, Route } from '@rhtml/fastify';
import { FastifyRequest } from 'fastify';
import {
AckCallbackFunction,
AmqpChannel,
AmqpService,
ConsumeMessage,
Subscribe,
Expand All @@ -84,7 +84,7 @@ export class MyController {
noAck: true, // Automatically acknowledge messages
},
})
withAutoAcknowledge(data: ConsumeMessage, ack: AckCallbackFunction) {
withAutoAcknowledge(data: ConsumeMessage, channel: AmqpChannel) {
// Parse the incoming message
const message = JSON.parse(data?.content.toString());
console.log(message);
Expand All @@ -97,13 +97,13 @@ export class MyController {
noAck: false,
},
})
withCustomAcknowledge(data: ConsumeMessage, done: AckCallbackFunction) {
withCustomAcknowledge(data: ConsumeMessage, channel: AmqpChannel) {
const message = JSON.parse(data?.content.toString());

setTimeout(() => {
// Long Running Job can be parsing some file
console.log(message);
done();
channel.ack(data!);
}, 10000);
}

Expand Down
2 changes: 0 additions & 2 deletions packages/amqp/src/amqp.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,3 @@ export type AmqpConnection = Connection;
*/
export const AmqpChannel = new InjectionToken<Channel>();
export type AmqpChannel = Channel;

export type AckCallbackFunction = () => void;
6 changes: 3 additions & 3 deletions packages/amqp/src/amqp.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Inject, Injectable } from '@rhtml/di';
import { ConsumeMessage, Options } from 'amqplib';
import { Channel, ConsumeMessage, Options } from 'amqplib';

import { AmqpChannel } from './amqp.constants';

Expand All @@ -18,7 +18,7 @@ export class AmqpService {

async subscribe(
name: string,
callback: (msg: ConsumeMessage, ack: () => void) => void,
callback: (msg: ConsumeMessage, channel: Channel) => void,
options?: {
assertOptions?: Options.AssertQueue;
consumeOptions?: Options.Consume;
Expand All @@ -28,7 +28,7 @@ export class AmqpService {

await this.channel.consume(
name,
(data) => callback(data!, () => this.channel.ack(data!)),
(data) => callback(data!, this.channel),
options?.consumeOptions
);
}
Expand Down
5 changes: 2 additions & 3 deletions packages/amqp/src/decorators/subscribe.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { set } from '@rhtml/di';
import { ConsumeMessage, Options } from 'amqplib';
import { Options } from 'amqplib';

import { AmqpService } from '../amqp.service';

Expand All @@ -12,7 +12,6 @@ export const Subscribe =
name: string;
consumeOptions?: Options.Consume;
assertOptions?: Options.AssertQueue;
parser?: <T = never>(msg: ConsumeMessage | null) => T;
}) =>
(target: T, memberName: string) => {
const OnInit =
Expand All @@ -25,7 +24,7 @@ export const Subscribe =
const amqpService = set(AmqpService);
await amqpService.subscribe(
name,
(msg, ack) => target[memberName].call(this, msg, ack),
(msg, channel) => target[memberName].call(this, msg, channel),
{ assertOptions, consumeOptions }
);
return OnInit.apply(this, args);
Expand Down

0 comments on commit af8c0a2

Please sign in to comment.