Skip to content

Commit

Permalink
feat: discover meta tags, auto binding
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jun 15, 2024
1 parent dda0247 commit 41d0fc8
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 58 deletions.
1 change: 1 addition & 0 deletions lib/common/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './meta-teg.discovery';
57 changes: 57 additions & 0 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Inject, Injectable } from '@nestjs/common';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MetaTegsMap } from '../interfaces';

@Injectable()
export class MetaTegsScannerService {
constructor(
private readonly metadataScanner: MetadataScanner,
private readonly reflector: Reflector,
private readonly modulesContainer: ModulesContainer,
@Inject('TARGET_MODULE') private readonly targetModuleName: string,
) {}

public scan(metaTeg: string) {
const rmqMessagesMap = new Map();
const modules = [...this.modulesContainer.values()];

const currentModule = modules.find(
(module) => module.metatype?.name === this.targetModuleName,
);
if (!currentModule) return rmqMessagesMap;
const providers = [...currentModule.providers.values()];
const controllers = [...currentModule.controllers.values()];
[...providers, ...controllers].forEach((provider: InstanceWrapper) => {
const { instance } = provider;
const prototype = Object.getPrototypeOf(instance);
this.metadataScanner
.getAllMethodNames(prototype)
.forEach((name: string) =>
this.lookupMethods(
metaTeg,
rmqMessagesMap,
instance,
prototype,
name,
),
);
});

return rmqMessagesMap;
}

private lookupMethods(
metaTeg: string,
rmqMessagesMap: MetaTegsMap,
instance: object,
prototype: object,
methodName: string,
) {
const method = prototype[methodName];
const event = this.reflector.get<string>(metaTeg, method);
const boundHandler = instance[methodName].bind(instance);
if (event) rmqMessagesMap.set(event, boundHandler);
}
}
2 changes: 2 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export const RMQ_CONNECT_OPTIONS = 'RMQ_CONNECT_OPTIONS';
export const RMQ_BROKER_OPTIONS = 'RMQ_BROKER_OPTIONS';
export const RMQ_MESSAGE_META_TEG = 'RMQ_MESSAGE_META_TEG';
export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM';
7 changes: 7 additions & 0 deletions lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { RMQ_MESSAGE_META_TEG } from 'lib/constants';

export function RMQMessage(event: string) {
return function (target: any, propertyKey: string | symbol, descriptor: any) {
Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value);
};
}
6 changes: 6 additions & 0 deletions lib/decorators/transform.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { applyDecorators, SetMetadata } from '@nestjs/common';
import { RMQ_ROUTES_TRANSFORM } from 'lib/constants';

export const RMQTransform = (): MethodDecorator => {
return applyDecorators(SetMetadata(RMQ_ROUTES_TRANSFORM, true));
};
6 changes: 0 additions & 6 deletions lib/interfaces/bindQueue.ts

This file was deleted.

7 changes: 0 additions & 7 deletions lib/interfaces/exchange.ts

This file was deleted.

6 changes: 3 additions & 3 deletions lib/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './connection';
export * from './exchange';
export * from './queue';
export * from './rmq-options.interface';
export * from './metategs';
export * from './rmqService';
1 change: 1 addition & 0 deletions lib/interfaces/metategs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type MetaTegsMap = Map<string | symbol, (...args: any[]) => any>;
10 changes: 0 additions & 10 deletions lib/interfaces/queue.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
import { ModuleMetadata } from '@nestjs/common';
import { IQueue } from './queue';
import { IExchange } from './exchange';
import { Options } from 'amqplib';
import { ModuleMetadata } from '@nestjs/common';

export interface IQueue {
queue: string;
options?: Options.AssertQueue;
}
export enum TypeQueue {
QUEUE,
REPLY_QUEUE,
}

export interface IExchange {
exchange: string;
type: 'direct' | 'topic' | 'headers' | 'fanout' | 'match';
options?: Options.AssertExchange;
}

export interface IRabbitMQConfig {
username: string;
Expand All @@ -20,4 +33,11 @@ export interface IMessageBroker {
exchange: IExchange;
queue?: IQueue;
replyTo: Options.AssertQueue;
targetModuleName: string;
}
export interface BindQueue {
queue: string;
source: string;
pattern: string;
args?: Record<string, any>;
}
1 change: 1 addition & 0 deletions lib/interfaces/rmqService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export interface ImqService {}
17 changes: 7 additions & 10 deletions lib/rmq-nestjs-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import {
OnModuleInit,
} from '@nestjs/common';
import { RMQ_CONNECT_OPTIONS } from './constants';
import { IRabbitMQConfig } from './interfaces/connection';
import {
IRabbitMQConfig,
IExchange,
IQueue,
TypeQueue,
BindQueue,
} from './interfaces';
import { Channel, Connection, Replies, connect } from 'amqplib';
import { IExchange } from './interfaces/exchange';
import { IQueue, TypeQueue } from './interfaces/queue';
import { BindQueue } from './interfaces/bindQueue';

@Injectable()
export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
Expand Down Expand Up @@ -66,12 +69,6 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}
}
async bindQueue(bindQueue: BindQueue): Promise<void> {
console.log(
bindQueue.queue,
bindQueue.source,
bindQueue.pattern,
bindQueue.args
);
try {
await this.baseChannel.bindQueue(
bindQueue.queue,
Expand Down
5 changes: 1 addition & 4 deletions lib/rmq-nestjs-core.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { DynamicModule, Module, Global } from '@nestjs/common';
import { RmqNestjsConnectService } from './rmq-nestjs-connect.service';
import { RMQ_CONNECT_OPTIONS } from './constants';
import {
IRMQSRootAsyncOptions,
IRabbitMQConfig,
} from './interfaces/connection';
import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces';

@Global()
@Module({})
Expand Down
12 changes: 8 additions & 4 deletions lib/rmq-nestjs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import {
IMessageBroker,
IRMQSRootAsyncOptions,
IRabbitMQConfig,
} from './interfaces/connection';
import { RMQ_BROKER_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants';
} from './interfaces';
import { RMQ_BROKER_OPTIONS } from './constants';
import { RmqNestjsCoreModule } from './rmq-nestjs-core.module';
import { RmqNestjsConnectService } from './rmq-nestjs-connect.service';
import { DiscoveryModule } from '@nestjs/core';
import { MetaTegsScannerService } from './common';

@Module({})
export class RmqNestjsModule {
Expand All @@ -26,11 +27,14 @@ export class RmqNestjsModule {
static forFeature(options: IMessageBroker): DynamicModule {
return {
module: RmqNestjsModule,
imports: [DiscoveryModule],
providers: [
{ provide: RMQ_BROKER_OPTIONS, useValue: options },
{ provide: 'TARGET_MODULE', useValue: options.targetModuleName },
RmqService,
MetaTegsScannerService,
],
exports: [RmqService],
exports: [RmqService, MetaTegsScannerService],
};
}
}
30 changes: 19 additions & 11 deletions lib/rmq-nestjs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,48 @@ import {
OnModuleInit,
} from '@nestjs/common';
import { RmqNestjsConnectService } from './rmq-nestjs-connect.service';
import { RMQ_BROKER_OPTIONS } from './constants';

import { IMessageBroker, TypeQueue } from './interfaces';
import { MetaTegsMap } from './interfaces/metategs';
import { RMQ_BROKER_OPTIONS, RMQ_MESSAGE_META_TEG } from './constants';
import { Replies } from 'amqplib';
import { MetaTegsScannerService } from './common';

@Injectable()
export class RmqService implements OnModuleInit, OnModuleDestroy {
private rmqMessageTegs: MetaTegsMap;
private replyToQueue: Replies.AssertQueue = null;
private exchange: Replies.AssertExchange = null;
constructor(
private readonly rmqNestjsConnectService: RmqNestjsConnectService,

private readonly metaTegsScannerService: MetaTegsScannerService,
@Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker
) {}

// async bindT

async onModuleInit() {
this.rmqMessageTegs =
this.metaTegsScannerService.scan(RMQ_MESSAGE_META_TEG);
await this.bindQueueExchange();
}
private async bindQueueExchange() {
try {
const exchange = await this.rmqNestjsConnectService.assertExchange(
this.exchange = await this.rmqNestjsConnectService.assertExchange(
this.options.exchange
);
if (this.options.replyTo) await this.assertReplyQueue();
if (!this.options.queue) return;
if (!this.options.queue || !this.rmqMessageTegs?.size) return;
const queue = await this.rmqNestjsConnectService.assertQueue(
TypeQueue.REPLY_QUEUE,
this.options.queue
);

await this.rmqNestjsConnectService.bindQueue({
queue: queue.queue,
source: exchange.exchange,
pattern: 'rmq-Nestjs-Connect',
args: { lopi: 12 },
this.rmqMessageTegs.forEach(async (_, key) => {
await this.rmqNestjsConnectService.bindQueue({
queue: queue.queue,
source: this.exchange.exchange,
pattern: key.toString(),
// args:any - comming soon
});
});
} catch (error) {
throw new Error(`Failed to bind queue to exchange,${error.message}`);
Expand Down

0 comments on commit 41d0fc8

Please sign in to comment.