Skip to content

Commit

Permalink
remove nestjs microservices package and use parseMessage from core pa…
Browse files Browse the repository at this point in the history
…ckage
  • Loading branch information
zgid123 committed Apr 9, 2023
1 parent 6751faa commit 34f5497
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 96 deletions.
1 change: 1 addition & 0 deletions examples/nestjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"dependencies": {
"@nestjs/common": "^9.4.0",
"@nestjs/core": "^9.4.0",
"@nestjs/microservices": "^9.4.0",
"@nestjs/platform-express": "^9.4.0",
"@rabbitmq-ts/nestjs-consumer": "workspace:*",
"@rabbitmq-ts/nestjs-producer": "workspace:*",
Expand Down
4 changes: 3 additions & 1 deletion examples/nestjs/src/consumer/consumer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export class ConsumerController {
@Ctx() context: RmqContext,
): Promise<string> {
console.log('data', data);
console.log('context', context);
console.log('context.getChannel()', context.getChannel());
console.log('context.getMessage()', context.getMessage());
console.log('context.getPattern()', context.getPattern());

return 'Ok!';
}
Expand Down
7 changes: 2 additions & 5 deletions packages/nestjs/consumer/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@rabbitmq-ts/nestjs-consumer",
"version": "2.0.1",
"version": "2.1.0",
"license": "MIT",
"directories": {
"lib": "lib"
Expand Down Expand Up @@ -38,10 +38,7 @@
},
"dependencies": {
"@nestjs/common": "^9.4.0",
"@nestjs/core": "^9.4.0",
"@nestjs/microservices": "^9.4.0",
"@rabbitmq-ts/core": "workspace:*",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.0"
"reflect-metadata": "^0.1.13"
}
}
9 changes: 1 addition & 8 deletions packages/nestjs/consumer/rollup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,7 @@ import typescript from '@rollup/plugin-typescript';
export default defineConfig({
input: 'src/index.ts',
plugins: [json(), resolve(), commonjs(), typescript()],
external: [
'@nestjs/microservices',
'@nestjs/common',
'@nestjs/core',
'@rabbitmq-ts/core',
'reflect-metadata',
'rxjs',
],
external: ['@nestjs/common', '@rabbitmq-ts/core', 'reflect-metadata'],
output: [
{
file: './lib/index.cjs',
Expand Down
22 changes: 13 additions & 9 deletions packages/nestjs/consumer/src/Context.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import { BaseRpcContext } from '@nestjs/microservices';

import type { Channel } from '@rabbitmq-ts/core';

type TRqmContextArgs = [Record<string, any>, Channel, string];
interface IRabbitMQContextProps {
pattern: string;
channel: Channel;
message: Record<string, any>;
}

export class RmqContext {
#context: IRabbitMQContextProps;

export class RmqContext extends BaseRpcContext<TRqmContextArgs> {
constructor(args: TRqmContextArgs) {
super(args);
constructor(context: IRabbitMQContextProps) {
this.#context = context;
}

public getMessage<T extends Record<string, any> = Record<string, any>>(): T {
return this.args[0] as T;
return this.#context.message as T;
}

public getChannel(): Channel {
return this.args[1];
return this.#context.channel;
}

public getPattern(): string {
return this.args[2];
return this.#context.pattern;
}
}
78 changes: 38 additions & 40 deletions packages/nestjs/consumer/src/RabbitMQConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import {
Server,
type MessageHandler,
type CustomTransportStrategy,
} from '@nestjs/microservices';
import { Logger } from '@nestjs/common';
import {
Connection,
parseMessage,
type Channel,
type TRepliesEmpty,
type ChannelWrapper,
Expand All @@ -14,26 +11,27 @@ import {
type TRepliesAssertExchange,
} from '@rabbitmq-ts/core';

import { omit } from './utils';
import { RmqContext } from './Context';
import { normalizePattern, omit } from './utils';

type TCallback = () => void;

interface ICreateServiceReturnProps {
strategy: RabbitMQConsumer;
}

export class RabbitMQConsumer
extends Server
implements CustomTransportStrategy
{
interface IMessageHandlerProps<TData = unknown, TResult = unknown> {
(data: TData, ctx?: RmqContext): Promise<TResult>;
}

export class RabbitMQConsumer {
#connection: Connection;
#patterns: ISubscribeParams[] = [];
#channel: ChannelWrapper | undefined;
#logger = new Logger('RabbitMQConsumer');
#messageHandlers = new Map<string, IMessageHandlerProps>();

constructor(props: IConnectionProps) {
super();

this.#connection = new Connection(props);
}

Expand All @@ -47,9 +45,9 @@ export class RabbitMQConsumer

public addHandler(
pattern: ISubscribeParams & { isRabbitMQ: boolean },
callback: MessageHandler,
isEventHandler = false,
extras: Record<string, any> = {},
callback: IMessageHandlerProps,
_isEventHandler = false,
_extras: Record<string, any> = {},
) {
if (typeof pattern === 'object' && pattern.isRabbitMQ) {
pattern = omit(pattern, ['isRabbitMQ']) as ISubscribeParams & {
Expand All @@ -59,7 +57,8 @@ export class RabbitMQConsumer
this.#patterns.push(pattern);
}

super.addHandler(pattern, callback, isEventHandler, extras);
const normalizedPattern = normalizePattern(pattern);
this.#messageHandlers.set(normalizedPattern, callback);
}

public listen(callback: TCallback) {
Expand Down Expand Up @@ -106,11 +105,7 @@ export class RabbitMQConsumer
channel.consume(
queueName,
(message) => {
return handleMessage(
message as ConsumeMessage,
channel,
pattern,
);
return handleMessage(message, channel, pattern);
},
consumerOptions,
),
Expand All @@ -122,32 +117,35 @@ export class RabbitMQConsumer
callback();
}

public close() {
this.#channel?.close();
this.#connection.close();
public async close() {
await this.#channel?.close();
await this.#connection.close();
}

async #handleMessage(
message: ConsumeMessage,
message: ConsumeMessage | null,
channel: Channel,
pattern: ISubscribeParams,
): Promise<void> {
const { content } = message;
let rawMessage = content.toString();

try {
rawMessage = JSON.parse(rawMessage);
} catch {
// do nothing
): Promise<unknown> {
if (message === null) {
return;
}

const packet = {
pattern,
data: rawMessage,
};
const patternAsString = JSON.stringify(pattern);
const rmqContext = new RmqContext([message, channel, patternAsString]);
const rawMessage = parseMessage(message);
const patternAsString = normalizePattern(pattern);
const rmqContext = new RmqContext({
message,
channel,
pattern: patternAsString,
});
const handler = this.#messageHandlers.get(patternAsString);

if (!handler) {
return this.#logger.error(
`There is no matching event handler defined in the consumer. Event pattern: ${patternAsString}`,
);
}

return this.handleEvent(patternAsString, packet, rmqContext);
return handler(rawMessage, rmqContext);
}
}
27 changes: 27 additions & 0 deletions packages/nestjs/consumer/src/decorators.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { assignMetadata } from '@nestjs/common';

import type { ISubscribeParams } from '@rabbitmq-ts/core';

const PAYLOAD_TYPE = 3;
const CONTEXT_TYPE = 6;
const ROUTE_ARGS_METADATA = '__routeArguments__';
const PATTERN_METADATA = 'microservices:pattern';
const PATTERN_HANDLER_METADATA = 'microservices:handler_type';

Expand Down Expand Up @@ -32,3 +37,25 @@ export function Subscribe({
return descriptor;
};
}

function createDecorator(type: number): () => ParameterDecorator {
return (): ParameterDecorator => {
return (target, key = '', index) => {
Reflect.defineMetadata(
ROUTE_ARGS_METADATA,
assignMetadata(
Reflect.getMetadata(ROUTE_ARGS_METADATA, target.constructor, key) ||
{},
type,
index,
),
target.constructor,
key,
);
};
};
}

export const Ctx = createDecorator(CONTEXT_TYPE);

export const Payload = createDecorator(PAYLOAD_TYPE);
5 changes: 1 addition & 4 deletions packages/nestjs/consumer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
export { Ctx, Payload } from '@nestjs/microservices';

export { Subscribe } from './decorators';

export * from './Context';
export { Subscribe, Ctx, Payload } from './decorators';
export * from './RabbitMQConsumer';
26 changes: 26 additions & 0 deletions packages/nestjs/consumer/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,29 @@ export function omit<T extends Record<string, any>, P extends keyof T>(
return result;
}, {} as Record<string, any>) as Omit<T, P>;
}

function isObject(data: unknown): boolean {
return !(data instanceof Date) && !!data && typeof data === 'object';
}

function sortedObj(obj: Record<string, any>): Record<string, any> {
return Object.keys(obj)
.sort()
.reduce<Record<string, any>>((result, key) => {
const value = obj[key];

if (Array.isArray(value)) {
result[key] = value.map((v) => sortedObj(v));
} else if (isObject(value)) {
result[key] = sortedObj(value);
} else {
result[key] = value;
}

return result;
}, {});
}

export function normalizePattern(pattern: Record<string, any>): string {
return JSON.stringify(sortedObj(pattern));
}
Loading

0 comments on commit 34f5497

Please sign in to comment.