Skip to content

Commit

Permalink
feat(@context/shared): rabbitMQ consumer and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cjuega committed Mar 7, 2024
1 parent 26f7028 commit b250b71
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type RabbitmqConfig = {
password: string;
exchange: string;
vhost?: string;
maxRetries?: number;
};

export default RabbitmqConfig;
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ export default class RabbitmqConnection {
}

async connect(): Promise<void> {
this._connection = await this.createConnection();
this._channel = await this.createChannel();
if (!this._connection) {
this._connection = await this.createConnection();
}

if (!this._channel) {
this._channel = await this.createChannel();
}
}

private async createConnection(): Promise<Connection> {
Expand Down Expand Up @@ -104,7 +109,10 @@ export default class RabbitmqConnection {

async close(): Promise<void> {
await this.channel.close();
this._channel = undefined;

await this.connection.close();
this._connection = undefined;
}

async publish(
Expand All @@ -113,12 +121,80 @@ export default class RabbitmqConnection {
message: Buffer,
options: { messageId: string; contentType: string; contentEncoding: string; priority?: number; headers?: unknown }
): Promise<void> {
if (!this._channel) {
await this.connect();
}
await this.connect();

return new Promise((resolve, reject) => {
this.channel.publish(exchange, routingKey, message, options, (err: Error) => (err ? reject(err) : resolve()));
});
}

// eslint-disable-next-line @typescript-eslint/ban-types
async consume(queueName: string, subscriber: (message: ConsumeMessage) => {}): Promise<void> {
await this.channel.consume(queueName, (msg: ConsumeMessage | null) => {
if (msg) {
subscriber(msg);
}
});
}

async handleError(message: ConsumeMessage, queueName: string): Promise<void> {
if (this.hasMaxRetriesReached(message)) {
await this.publishToDeadletter(message, queueName);
} else {
await this.publishToRetry(message, queueName);
}
}

private hasMaxRetriesReached(message: ConsumeMessage): boolean {
if (this.config.maxRetries === undefined) {
return false;
}

const count = message.properties.headers['redelivery-count'] as number;

return count >= this.config.maxRetries;
}

private async publishToRetry(message: ConsumeMessage, queueName: string): Promise<void> {
const options = RabbitmqConnection.generateMessageOptionsFromRedeliveredMessage(message);

await this.publish(RabbitmqConnection.retryName(this.config.exchange), queueName, message.content, options);
}

private async publishToDeadletter(message: ConsumeMessage, queueName: string): Promise<void> {
const options = RabbitmqConnection.generateMessageOptionsFromRedeliveredMessage(message);

await this.publish(RabbitmqConnection.deadLetterName(this.config.exchange), queueName, message.content, options);
}

private static generateMessageOptionsFromRedeliveredMessage(message: ConsumeMessage): {
messageId: string;
contentType: string;
contentEncoding: string;
priority?: number;
headers?: unknown;
} {
return {
messageId: message.properties.messageId as string,
headers: { ...message.properties.headers, ...RabbitmqConnection.incrementRedeliveryCount(message.properties.headers) },
contentType: message.properties.contentType as string,
contentEncoding: message.properties.contentEncoding as string,
priority: message.properties.priority as number
};
}

private static incrementRedeliveryCount(headers: MessagePropertyHeaders): Record<string, unknown> {
const count = headers['redelivery-count'] as number;

if (count) {
return { 'redelivery-count': count + 1 };
}

return { 'redelivery-count': 1 };
}

// eslint-disable-next-line @typescript-eslint/require-await
async ack(message: ConsumeMessage): Promise<void> {
this.channel.ack(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import EnvironmentArranger from '@src/infrastructure/arranger/environmentArranger';
import RabbitmqConfigurer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfigurer';
import RabbitmqConnection from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConnection';

export default class RabbitmqEnvironmentArranger implements EnvironmentArranger {
private readonly connection: RabbitmqConnection;

constructor(connection: RabbitmqConnection) {
private readonly configurer: RabbitmqConfigurer;

private isConfigured = false;

constructor(connection: RabbitmqConnection, configurer: RabbitmqConfigurer) {
this.connection = connection;
this.configurer = configurer;
}

async arrange(): Promise<void> {
await this.configure();
// TODO: clean up queues
}

// eslint-disable-next-line class-methods-use-this
arrange(): Promise<void> {
// TODO: implement
return Promise.resolve();
private async configure(): Promise<void> {
if (!this.isConfigured) {
await this.configurer.configure();
this.isConfigured = true;
}
}

async close(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import ObjectMother from '@src/domain/objectMother.mother';
import DomainEventJsonMarshaller from '@src/infrastructure/eventBus/marshallers/json/domainEventJsonMarshaller';
import RabbitmqClientFactory from '@src/infrastructure/eventBus/rabbitmq/rabbitmqClientFactory';
import RabbitmqConfig from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfig';
import RabbitmqConfigurer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConfigurer';
import RabbitmqEnvironmentArranger from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEnvironmentArranger';
import RabbitmqEventBus from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEventBus';
import RabbitmqEventBusConsumer from '@src/infrastructure/eventBus/rabbitmq/rabbitmqEventBusConsumer';

class DummyEvent extends DomainEvent {
static eventName = 'dummy:event';
Expand Down Expand Up @@ -58,19 +60,25 @@ const config: RabbitmqConfig = {
port: 5672,
username: 'root',
password: 'integration-test',
exchange: ''
exchange: 'domain-events',
maxRetries: 3
},
connection = RabbitmqClientFactory.createClient('integration-tests', config),
subscribers = [new DomainEventSubscriberDummy()],
marshaller = new DomainEventJsonMarshaller(new DomainEventMapping(subscribers)),
eventBus = new RabbitmqEventBus(connection, marshaller, config),
arranger = new RabbitmqEnvironmentArranger(connection);
eventBusConsumer = new RabbitmqEventBusConsumer(connection, subscribers, marshaller),
configurer = new RabbitmqConfigurer(connection, subscribers, config, { retryDelay: 1000 }),
arranger = new RabbitmqEnvironmentArranger(connection, configurer);

describe('rabbitmqEventBus', () => {
// eslint-disable-next-line jest/no-hooks
beforeEach(async () => {
await arranger.arrange();
subscribers[0].setExpectation(undefined);
// We must start the consumer before each test because the rabbitMQ connection is shared
// and the configurer closes it after configuring exchanges and queues
await eventBusConsumer.start();
});

// eslint-disable-next-line jest/no-hooks
Expand All @@ -88,4 +96,23 @@ describe('rabbitmqEventBus', () => {

expect(true).toBe(true);
});

// eslint-disable-next-line jest/no-done-callback
it('the subscriber should be called when the event it is subscribed to is published', (done) => {
expect.hasAssertions();

const event = new DummyEvent({ id: ObjectMother.uuid() });

subscribers[0].setExpectation((actual: DummyEvent) => {
expect(actual).toStrictEqual(event);
});

eventBus.publish([event])
.then(done)
.catch(() => {
// eslint-disable-next-line jest/no-conditional-expect
expect(false).toBe(true);
done();
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import DomainEvent from '@src/domain/eventBus/domainEvent';
import { DomainEventSubscriber } from '@src/domain/eventBus/domainEventSubscriber';
import { DomainEventUnmarshaller } from '@src/domain/eventBus/domainEventUnmarshaller';
import { EventBusConsumer } from '@src/domain/eventBus/eventBusConsumer';
import RabbitmqConnection from '@src/infrastructure/eventBus/rabbitmq/rabbitmqConnection';
import { ConsumeMessage } from 'amqplib';

export default class RabbitmqEventBusConsumer implements EventBusConsumer {
private readonly connection: RabbitmqConnection;

private readonly subscribers: DomainEventSubscriber<DomainEvent>[];

private readonly unmarshaller: DomainEventUnmarshaller;

constructor(connection: RabbitmqConnection, subscribers: DomainEventSubscriber<DomainEvent>[], unmarshaller: DomainEventUnmarshaller) {
this.connection = connection;
this.subscribers = subscribers;
this.unmarshaller = unmarshaller;
}

async start(): Promise<void> {
await this.connection.connect();

await Promise.all(this.subscribers.map((s) => this.connection.consume(s.name(), this.consume(s))));
}

private consume(subscriber: DomainEventSubscriber<DomainEvent>) {
return async (message: ConsumeMessage): Promise<void> => {
try {
const event = this.unmarshaller.unmarshall(message.content.toString());

await subscriber.on(event);
} catch {
// TODO: ignore errors from unmashalling as it means the service is not interested in the event
await this.connection.handleError(message, subscriber.name());
} finally {
await this.connection.ack(message);
}
};
}
}

0 comments on commit b250b71

Please sign in to comment.