diff --git a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConfig.ts b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConfig.ts index 2bc7c1a..f88e45d 100644 --- a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConfig.ts +++ b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConfig.ts @@ -5,6 +5,7 @@ type RabbitmqConfig = { password: string; exchange: string; vhost?: string; + maxRetries?: number; }; export default RabbitmqConfig; diff --git a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConnection.ts b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConnection.ts index 170aee2..6519bd3 100644 --- a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConnection.ts +++ b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqConnection.ts @@ -74,8 +74,13 @@ export default class RabbitmqConnection { } async connect(): Promise { - this._connection = await this.createConnection(); - this._channel = await this.createChannel(); + if (!this._connection) { + this._connection = await this.createConnection(); + } + + if (!this._channel) { + this._channel = await this.createChannel(); + } } private async createConnection(): Promise { @@ -104,7 +109,10 @@ export default class RabbitmqConnection { async close(): Promise { await this.channel.close(); + this._channel = undefined; + await this.connection.close(); + this._connection = undefined; } async publish( @@ -113,12 +121,80 @@ export default class RabbitmqConnection { message: Buffer, options: { messageId: string; contentType: string; contentEncoding: string; priority?: number; headers?: unknown } ): Promise { - if (!this._channel) { - await this.connect(); - } + await this.connect(); return new Promise((resolve, reject) => { this.channel.publish(exchange, routingKey, message, options, (err: Error) => (err ? reject(err) : resolve())); }); } + + // eslint-disable-next-line @typescript-eslint/ban-types + async consume(queueName: string, subscriber: (message: ConsumeMessage) => {}): Promise { + await this.channel.consume(queueName, (msg: ConsumeMessage | null) => { + if (msg) { + subscriber(msg); + } + }); + } + + async handleError(message: ConsumeMessage, queueName: string): Promise { + if (this.hasMaxRetriesReached(message)) { + await this.publishToDeadletter(message, queueName); + } else { + await this.publishToRetry(message, queueName); + } + } + + private hasMaxRetriesReached(message: ConsumeMessage): boolean { + if (this.config.maxRetries === undefined) { + return false; + } + + const count = message.properties.headers['redelivery-count'] as number; + + return count >= this.config.maxRetries; + } + + private async publishToRetry(message: ConsumeMessage, queueName: string): Promise { + const options = RabbitmqConnection.generateMessageOptionsFromRedeliveredMessage(message); + + await this.publish(RabbitmqConnection.retryName(this.config.exchange), queueName, message.content, options); + } + + private async publishToDeadletter(message: ConsumeMessage, queueName: string): Promise { + const options = RabbitmqConnection.generateMessageOptionsFromRedeliveredMessage(message); + + await this.publish(RabbitmqConnection.deadLetterName(this.config.exchange), queueName, message.content, options); + } + + private static generateMessageOptionsFromRedeliveredMessage(message: ConsumeMessage): { + messageId: string; + contentType: string; + contentEncoding: string; + priority?: number; + headers?: unknown; + } { + return { + messageId: message.properties.messageId as string, + headers: { ...message.properties.headers, ...RabbitmqConnection.incrementRedeliveryCount(message.properties.headers) }, + contentType: message.properties.contentType as string, + contentEncoding: message.properties.contentEncoding as string, + priority: message.properties.priority as number + }; + } + + private static incrementRedeliveryCount(headers: MessagePropertyHeaders): Record { + const count = headers['redelivery-count'] as number; + + if (count) { + return { 'redelivery-count': count + 1 }; + } + + return { 'redelivery-count': 1 }; + } + + // eslint-disable-next-line @typescript-eslint/require-await + async ack(message: ConsumeMessage): Promise { + this.channel.ack(message); + } } diff --git a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger.ts b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger.ts index 5bdbf2e..fe47c50 100644 --- a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger.ts +++ b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger.ts @@ -1,17 +1,29 @@ import EnvironmentArranger from '@src/infrastructure/arranger/environmentArranger'; +import RabbitmqConfigurer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfigurer'; import RabbitmqConnection from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConnection'; export default class RabbitmqEnvironmentArranger implements EnvironmentArranger { private readonly connection: RabbitmqConnection; - constructor(connection: RabbitmqConnection) { + private readonly configurer: RabbitmqConfigurer; + + private isConfigured = false; + + constructor(connection: RabbitmqConnection, configurer: RabbitmqConfigurer) { this.connection = connection; + this.configurer = configurer; + } + + async arrange(): Promise { + await this.configure(); + // TODO: clean up queues } - // eslint-disable-next-line class-methods-use-this - arrange(): Promise { - // TODO: implement - return Promise.resolve(); + private async configure(): Promise { + if (!this.isConfigured) { + await this.configurer.configure(); + this.isConfigured = true; + } } async close(): Promise { diff --git a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus.test.ts b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus.test.ts index f96f406..c4e29dc 100644 --- a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus.test.ts +++ b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus.test.ts @@ -7,8 +7,10 @@ import ObjectMother from '@src/domain/objectMother.mother'; import DomainEventJsonMarshaller from '@src/infrastructure/eventBus/marshallers/json/domainEventJsonMarshaller'; import RabbitmqClientFactory from '@src/infrastructure/eventBus/rabbitmq/rabbitmqClientFactory'; import RabbitmqConfig from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfig'; +import RabbitmqConfigurer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfigurer'; import RabbitmqEnvironmentArranger from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger'; import RabbitmqEventBus from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus'; +import RabbitmqEventBusConsumer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEventBusConsumer'; class DummyEvent extends DomainEvent { static eventName = 'dummy:event'; @@ -58,19 +60,25 @@ const config: RabbitmqConfig = { port: 5672, username: 'root', password: 'integration-test', - exchange: '' + exchange: 'domain-events', + maxRetries: 3 }, connection = RabbitmqClientFactory.createClient('integration-tests', config), subscribers = [new DomainEventSubscriberDummy()], marshaller = new DomainEventJsonMarshaller(new DomainEventMapping(subscribers)), eventBus = new RabbitmqEventBus(connection, marshaller, config), - arranger = new RabbitmqEnvironmentArranger(connection); + eventBusConsumer = new RabbitmqEventBusConsumer(connection, subscribers, marshaller), + configurer = new RabbitmqConfigurer(connection, subscribers, config, { retryDelay: 1000 }), + arranger = new RabbitmqEnvironmentArranger(connection, configurer); describe('rabbitmqEventBus', () => { // eslint-disable-next-line jest/no-hooks beforeEach(async () => { await arranger.arrange(); subscribers[0].setExpectation(undefined); + // We must start the consumer before each test because the rabbitMQ connection is shared + // and the configurer closes it after configuring exchanges and queues + await eventBusConsumer.start(); }); // eslint-disable-next-line jest/no-hooks @@ -88,4 +96,23 @@ describe('rabbitmqEventBus', () => { expect(true).toBe(true); }); + + // eslint-disable-next-line jest/no-done-callback + it('the subscriber should be called when the event it is subscribed to is published', (done) => { + expect.hasAssertions(); + + const event = new DummyEvent({ id: ObjectMother.uuid() }); + + subscribers[0].setExpectation((actual: DummyEvent) => { + expect(actual).toStrictEqual(event); + }); + + eventBus.publish([event]) + .then(done) + .catch(() => { + // eslint-disable-next-line jest/no-conditional-expect + expect(false).toBe(true); + done(); + }); + }); }); diff --git a/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBusConsumer.ts b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBusConsumer.ts new file mode 100644 index 0000000..e44291b --- /dev/null +++ b/src/contexts/shared/src/infrastructure/eventBus/rabbitmq/rabbitmqEventBusConsumer.ts @@ -0,0 +1,41 @@ +import DomainEvent from '@src/domain/eventBus/domainEvent'; +import { DomainEventSubscriber } from '@src/domain/eventBus/domainEventSubscriber'; +import { DomainEventUnmarshaller } from '@src/domain/eventBus/domainEventUnmarshaller'; +import { EventBusConsumer } from '@src/domain/eventBus/eventBusConsumer'; +import RabbitmqConnection from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConnection'; +import { ConsumeMessage } from 'amqplib'; + +export default class RabbitmqEventBusConsumer implements EventBusConsumer { + private readonly connection: RabbitmqConnection; + + private readonly subscribers: DomainEventSubscriber[]; + + private readonly unmarshaller: DomainEventUnmarshaller; + + constructor(connection: RabbitmqConnection, subscribers: DomainEventSubscriber[], unmarshaller: DomainEventUnmarshaller) { + this.connection = connection; + this.subscribers = subscribers; + this.unmarshaller = unmarshaller; + } + + async start(): Promise { + await this.connection.connect(); + + await Promise.all(this.subscribers.map((s) => this.connection.consume(s.name(), this.consume(s)))); + } + + private consume(subscriber: DomainEventSubscriber) { + return async (message: ConsumeMessage): Promise => { + try { + const event = this.unmarshaller.unmarshall(message.content.toString()); + + await subscriber.on(event); + } catch { + // TODO: ignore errors from unmashalling as it means the service is not interested in the event + await this.connection.handleError(message, subscriber.name()); + } finally { + await this.connection.ack(message); + } + }; + } +}