Skip to content

Commit

Permalink
merge pull request from DIY0R/helth-check
Browse files Browse the repository at this point in the history
feat: helth check and events connection
  • Loading branch information
DIY0R authored Jun 25, 2024
2 parents 270a419 + 8496f48 commit 8dc16f6
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 42 deletions.
2 changes: 1 addition & 1 deletion lib/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class RQMColorLogger implements LoggerService {
Logger.log(message, context);
}
error(message: any, trace?: string, context?: string): any {
Logger.error(message, trace, context);
Logger.error(message);
}
debug(message: any, context?: string): any {
if (!this.logMessages) {
Expand Down
8 changes: 8 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error';
export const ERROR_RMQ_SERVICE = 'Rmq service error';
export const INOF_NOT_FULL_OPTIONS =
'Queue will not be created if there is no bind';

export const CLOSE_MESSAGE = 'Disconnected from RMQ. Trying to reconnect';
export const CONNECT_FAILED_MESSAGE = 'Failed to connect to RMQ';
export const WRONG_CREDENTIALS_MESSAGE = 'Wrong credentials for RMQ';
export const CONNECT_BLOCKED_MESSAGE = 'Connection blocked';
export const CLOSE_EVENT = 'close';
export const CONNECT_ERROR = 'error';
export const CONNECT_BLOCKED = 'blocked';
62 changes: 46 additions & 16 deletions lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import {
Inject,
Injectable,
LoggerService,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { RMQ_CONNECT_OPTIONS } from './constants';
import {
CLOSE_EVENT,
CLOSE_MESSAGE,
CONNECT_BLOCKED,
CONNECT_BLOCKED_MESSAGE,
CONNECT_ERROR,
CONNECT_FAILED_MESSAGE,
RMQ_APP_OPTIONS,
RMQ_CONNECT_OPTIONS,
WRONG_CREDENTIALS_MESSAGE,
} from './constants';
import {
IRabbitMQConfig,
IExchange,
Expand All @@ -15,17 +26,25 @@ import {
ISendToReplyQueueOptions,
} from './interfaces';
import { Channel, Connection, ConsumeMessage, Replies, connect } from 'amqplib';
import { IAppOptions } from './interfaces/app-options.interface';
import { RQMColorLogger } from './common/logger';

@Injectable()
export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
private connection: Connection = null;
private baseChannel: Channel = null;
private replyToChannel: Channel = null;

public isConnected = false;
private logger: LoggerService;
private declared = false;
constructor(
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig
) {}
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig,
@Inject(RMQ_APP_OPTIONS) private appOptions: IAppOptions,
) {
this.logger = appOptions.logger
? appOptions.logger
: new RQMColorLogger(this.appOptions.logMessages);
}
async onModuleInit(): Promise<void> {
if (this.declared) throw Error('Root RmqNestjsModule already declared!');
await this.setUpConnect(this.options);
Expand All @@ -34,18 +53,18 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}

public async assertExchange(
options: IExchange
options: IExchange,
): Promise<Replies.AssertExchange> {
try {
const exchange = await this.baseChannel.assertExchange(
options.exchange,
options.type,
options.options
options.options,
);
return exchange;
} catch (error) {
throw new Error(
`Failed to assert exchange '${options.exchange}': ${error.message}`
`Failed to assert exchange '${options.exchange}': ${error.message}`,
);
}
}
Expand All @@ -56,19 +75,19 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}
public async assertQueue(
typeQueue: TypeQueue,
options: IQueue
options: IQueue,
): Promise<Replies.AssertQueue> {
try {
if (typeQueue == TypeQueue.QUEUE) {
const queue = await this.baseChannel.assertQueue(
options.queue,
options.options
options.options,
);
return queue;
}
const queue = await this.replyToChannel.assertQueue(
options.queue || '',
options.options
options.options,
);
return queue;
} catch (error) {
Expand All @@ -81,11 +100,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
bindQueue.queue,
bindQueue.source,
bindQueue.pattern,
bindQueue.args
bindQueue.args,
);
} catch (error) {
throw new Error(
`Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}`
`Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}`,
);
}
}
Expand All @@ -96,15 +115,15 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
Buffer.from(JSON.stringify(sendToQueueOptions.content)),
{
correlationId: sendToQueueOptions.correlationId,
}
},
);
} catch (error) {
throw new Error(`Failed to send Reply Queue`);
}
}
async listenReplyQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
listenQueue: (msg: ConsumeMessage | null) => void,
) {
try {
await this.replyToChannel.consume(queue, listenQueue, {
Expand All @@ -116,7 +135,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}
async listenQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
listenQueue: (msg: ConsumeMessage | null) => void,
): Promise<void> {
try {
await this.baseChannel.consume(queue, listenQueue, {
Expand All @@ -136,7 +155,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
{
replyTo: sendMessage.options.replyTo,
correlationId: sendMessage.options.correlationId,
}
},
);
} catch (error) {
throw new Error(`Failed to send message ${error}`);
Expand All @@ -146,6 +165,17 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
const { username, password, hostname, port, virtualHost } = options;
const url = `amqp://${username}:${password}@${hostname}:${port}/${virtualHost}`;
this.connection = await connect(url);
this.isConnected = true;
this.connection.on(CLOSE_EVENT, (err) => {
this.isConnected = false;
this.logger.error(CLOSE_MESSAGE);
});
this.connection.on(CONNECT_ERROR, (err) => {
this.logger.error(CONNECT_FAILED_MESSAGE);
});
this.connection.on(CONNECT_BLOCKED, (err) => {
this.logger.error(CONNECT_BLOCKED_MESSAGE);
});
}
private async createChannels() {
try {
Expand Down
31 changes: 17 additions & 14 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
private replyToQueue: Replies.AssertQueue = null;
private exchange: Replies.AssertExchange = null;
private isInitialized: boolean = false;
private connected = false;
private logger: LoggerService;
constructor(
private readonly rmqNestjsConnectService: RmqNestjsConnectService,
private readonly metaTegsScannerService: MetaTegsScannerService,
@Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker,
@Inject(RMQ_APP_OPTIONS) private appOptions: IAppOptions,
@Inject(MODULE_TOKEN) private readonly moduleToken: string
@Inject(MODULE_TOKEN) private readonly moduleToken: string,
) {
this.logger = appOptions.logger
? appOptions.logger
Expand All @@ -51,16 +52,18 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
async onModuleInit() {
this.rmqMessageTegs = this.metaTegsScannerService.scan(
RMQ_MESSAGE_META_TEG,
this.moduleToken
this.moduleToken,
);
await this.init();
this.isInitialized = true;
}

public healthCheck() {
return this.rmqNestjsConnectService.isConnected;
}
public async notify<IMessage>(
topic: string,
message: IMessage,
options?: IPublishOptions
options?: IPublishOptions,
) {
await this.initializationCheck();
this.rmqNestjsConnectService.publish({
Expand All @@ -78,7 +81,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
public async send<IMessage, IReply>(
topic: string,
message: IMessage,
options?: IPublishOptions
options?: IPublishOptions,
): Promise<IReply> {
await this.initializationCheck();
if (!this.replyToQueue) return this.logger.error(INDICATE_ERROR);
Expand Down Expand Up @@ -113,7 +116,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey);
if (!consumeFunction) return;
const result = await consumeFunction(
JSON.parse(message.content.toString())
JSON.parse(message.content.toString()),
);
if (message.properties.replyTo) {
await this.rmqNestjsConnectService.sendToReplyQueue({
Expand All @@ -125,7 +128,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
this.rmqNestjsConnectService.ack(message);
}
private async listenReplyQueue(
message: ConsumeMessage | null
message: ConsumeMessage | null,
): Promise<void> {
if (message.properties.correlationId) {
this.sendResponseEmitter.emit(message.properties.correlationId, message);
Expand All @@ -134,7 +137,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {

private async init() {
this.exchange = await this.rmqNestjsConnectService.assertExchange(
this.options.exchange
this.options.exchange,
);
if (this.options.replyTo) await this.assertReplyQueueBind();
await this.bindQueueExchange();
Expand All @@ -143,11 +146,11 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
if (!this.options.queue || !this.rmqMessageTegs?.size)
return this.logger.warn(
INOF_NOT_FULL_OPTIONS,
this.options.exchange.exchange
this.options.exchange.exchange,
);
const queue = await this.rmqNestjsConnectService.assertQueue(
TypeQueue.QUEUE,
this.options.queue
this.options.queue,
);
this.rmqMessageTegs.forEach(async (_, key) => {
await this.rmqNestjsConnectService.bindQueue({
Expand All @@ -158,24 +161,24 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
});
await this.rmqNestjsConnectService.listenQueue(
this.options.queue.queue,
this.listenQueue.bind(this)
this.listenQueue.bind(this),
);
}

private async assertReplyQueueBind() {
this.replyToQueue = await this.rmqNestjsConnectService.assertQueue(
TypeQueue.REPLY_QUEUE,
{ queue: '', options: this.options.replyTo }
{ queue: '', options: this.options.replyTo },
);
await this.rmqNestjsConnectService.listenReplyQueue(
this.replyToQueue.queue,
this.listenReplyQueue.bind(this)
this.listenReplyQueue.bind(this),
);
}
private async initializationCheck() {
if (this.isInitialized) return;
await new Promise<void>((resolve) =>
setTimeout(resolve, INITIALIZATION_STEP_DELAY)
setTimeout(resolve, INITIALIZATION_STEP_DELAY),
);
await this.initializationCheck();
}
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/rmq-nestjs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ import { RmqServieController } from './rmq.controller';
}),
],
providers: [RmqEvents, RmqServieController],
exports: [RmqServieController],
exports: [RmqServieController, RmqNestjsModule],
})
export class ConnectionMockModule {}
8 changes: 4 additions & 4 deletions test/mocks/rmq.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { RmqService } from '../../lib';
export class RmqServieController {
constructor(private readonly rmqServie: RmqService) {}

async sendHi() {
const sendhi = await this.rmqServie.send<object, { message: 'hi' }>(
'hi',
{},
async sendHi(obj: any) {
const sendhi = await this.rmqServie.send<object, { message: object }>(
'text.text',
obj,
);
return sendhi;
}
Expand Down
6 changes: 3 additions & 3 deletions test/mocks/rmq.event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { MessageRoute } from '../../lib/decorators/rmq-message.decorator';

@Injectable()
export class RmqEvents {
@MessageRoute('hi')
hi() {
return { message: 'hi' };
@MessageRoute('text.text')
recived(obj: any) {
return { message: obj };
}
}
17 changes: 14 additions & 3 deletions test/rmq-nestjs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ConnectionMockModule } from './mocks/rmq-nestjs.module';
describe('RMQe2e', () => {
let api: INestApplication;
let rmqServieController: RmqServieController;

let rmqService: RmqService;
beforeAll(async () => {
const apiModule = await Test.createTestingModule({
imports: [
Expand All @@ -26,14 +26,25 @@ describe('RMQe2e', () => {

rmqServieController =
apiModule.get<RmqServieController>(RmqServieController);
rmqService = apiModule.get(RmqService);
console.warn = jest.fn();
console.log = jest.fn();
});

describe('rpc', () => {
it('check connection', async () => {
const isConnected = rmqService.healthCheck();
expect(isConnected).toBe(true);
});
it('successful send()', async () => {
const { message } = await rmqServieController.sendHi();
expect(message).toBe('hi');
const obj = { time: '001', fulled: 12 };
const { message } = await rmqServieController.sendHi(obj);
expect(message).toEqual(obj);
});
it('filed send()', async () => {
const obj = { time: '001', fulled: 12 };
const { message } = await rmqServieController.sendHi(obj);
expect(message).not.toEqual({});
});
});

Expand Down

0 comments on commit 8dc16f6

Please sign in to comment.