Skip to content

Commit

Permalink
feat(rabbit-bus): outbox pattern first implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 28, 2024
1 parent 2150477 commit 0dce091
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 74 deletions.
7 changes: 5 additions & 2 deletions cspell.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"language": "en",
"words": ["fizzbuds"]
"language": "en",
"words": [
"fizzbuds",
"replset"
]
}
133 changes: 68 additions & 65 deletions packages/ddd-tookit-rabbit-bus/package.json
Original file line number Diff line number Diff line change
@@ -1,68 +1,71 @@
{
"name": "@fizzbuds/ddd-toolkit-rabbit-bus",
"version": "0.0.35",
"description": "",
"author": "Gabriele Toselli, Luca Giovenzana",
"private": false,
"license": "Apache-2.0",
"main": "dist/index",
"types": "dist/index.d.ts",
"scripts": {
"build": "rimraf dist && tsc --project tsconfig.build.json",
"test": "jest",
"test:coverage": "jest --coverage",
"check": "cspell lint --quiet src"
"name": "@fizzbuds/ddd-toolkit-rabbit-bus",
"version": "0.0.35",
"description": "",
"author": "Gabriele Toselli, Luca Giovenzana",
"private": false,
"license": "Apache-2.0",
"main": "dist/index",
"types": "dist/index.d.ts",
"scripts": {
"build": "rimraf dist && tsc --project tsconfig.build.json",
"test": "jest",
"test:coverage": "jest --coverage",
"check": "cspell lint --quiet src"
},
"dependencies": {
"@fizzbuds/ddd-toolkit": "workspace:^"
},
"peerDependencies": {
"amqplib": "^0.10.3",
"mongodb": "^4.0.0 || ^5.0.0"
},
"devDependencies": {
"@types/amqplib": "^0.10.5",
"@types/jest": "^29.5.2",
"@types/lodash": "^4.14.195",
"@types/node": "^20.3.1",
"@types/uuid": "^9.0.2",
"@typescript-eslint/eslint-plugin": "^6.18.1",
"@typescript-eslint/parser": "^6.18.1",
"amqplib": "^0.10.3",
"cspell": "^8.3.2",
"eslint": "^8.56.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.2",
"husky": "^8.0.0",
"jest": "^29.5.0",
"lint-staged": "^14.0.1",
"prettier": "^3.1.1",
"rimraf": "^5.0.5",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.1.3",
"mongodb": "^4.17.2",
"mongodb-memory-server": "^8.13.0"
},
"publishConfig": {
"access": "public"
},
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*spec\\.ts$",
"testPathIgnorePatterns": [
".api-spec.ts$"
],
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"dependencies": {
"@fizzbuds/ddd-toolkit": "workspace:^"
},
"peerDependencies": {
"amqplib": "^0.10.3"
},
"devDependencies": {
"@types/amqplib": "^0.10.5",
"@types/jest": "^29.5.2",
"@types/lodash": "^4.14.195",
"@types/node": "^20.3.1",
"@types/uuid": "^9.0.2",
"@typescript-eslint/eslint-plugin": "^6.18.1",
"@typescript-eslint/parser": "^6.18.1",
"amqplib": "^0.10.3",
"cspell": "^8.3.2",
"eslint": "^8.56.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.2",
"husky": "^8.0.0",
"jest": "^29.5.0",
"lint-staged": "^14.0.1",
"prettier": "^3.1.1",
"rimraf": "^5.0.5",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.1.3"
},
"publishConfig": {
"access": "public"
},
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*spec\\.ts$",
"testPathIgnorePatterns": [
".api-spec.ts$"
],
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
}
50 changes: 50 additions & 0 deletions packages/ddd-tookit-rabbit-bus/src/mongo-bus-persistence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { ClientSession, Collection, MongoClient } from 'mongodb';
import { IEvent, ILogger } from '@fizzbuds/ddd-toolkit';

export interface IBusPersistence {
init(): Promise<void>;

terminate(): Promise<void>;

persistEvent(event: IEvent<unknown>, status: 'pending' | 'published', session?: unknown): Promise<void>;

getEventStatus(event: IEvent<unknown>): Promise<'pending' | 'published' | null>;

getPendingEvents(): Promise<IEvent<unknown>[]>;
}

export class MongoBusPersistence implements IBusPersistence {
private readonly collection: Collection<{
event: IEvent<unknown>;
status: 'pending' | 'published';
}>;

constructor(
protected readonly mongoClient: MongoClient,
private readonly logger: ILogger,
) {
this.collection = this.mongoClient.db().collection('bus_persistence');
}

async init(): Promise<void> {}

async persistEvent(
event: IEvent<unknown>,
status: 'pending' | 'published',
session?: ClientSession,
): Promise<void> {
await this.collection.updateOne({ event }, { $set: { status } }, { upsert: true, session });
}

async getEventStatus(event: IEvent<unknown>): Promise<'pending' | 'published' | null> {
const doc = await this.collection.findOne({ event });
return doc?.status ?? null;
}

async terminate(): Promise<void> {}

async getPendingEvents(): Promise<IEvent<unknown>[]> {
const docs = await this.collection.find({ status: 'pending' }).toArray();
return docs.map((doc) => doc.event);
}
}
75 changes: 73 additions & 2 deletions packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Event, IEventHandler, ILogger } from '@fizzbuds/ddd-toolkit';
import { Event, IEventHandler, ILogger } from '../../ddd-tookit/src';
import { RabbitEventBus } from './rabbit-event-bus';
import { MongoBusPersistence } from './mongo-bus-persistence';
import { MongoMemoryReplSet } from 'mongodb-memory-server';
import { MongoClient } from 'mongodb';

const loggerMock: ILogger = {
log: jest.fn(),
Expand All @@ -22,6 +25,20 @@ class BarEvent extends Event<{ bar: string }> {

describe('RabbitEventBus', () => {
afterEach(() => jest.resetAllMocks());
let replset: MongoMemoryReplSet;
let mongoClient: MongoClient;
let mongoBusPersistence: MongoBusPersistence;

beforeAll(async () => {
replset = await MongoMemoryReplSet.create({ replSet: { count: 4 } });
mongoClient = await new MongoClient(replset.getUri()).connect();
mongoBusPersistence = new MongoBusPersistence(mongoClient, loggerMock);
});

afterAll(async () => {
await mongoClient.close();
await replset.stop();
});

describe('Given a RabbitEventBus instance', () => {
let rabbitEventBus: RabbitEventBus;
Expand All @@ -35,6 +52,7 @@ describe('RabbitEventBus', () => {
3,
undefined,
loggerMock,
mongoBusPersistence,
);
});

Expand Down Expand Up @@ -67,13 +85,34 @@ describe('RabbitEventBus', () => {

beforeEach(async () => await rabbitEventBus.subscribe(FooEvent, new FooEventHandler()));

describe('When publish an event', () => {
describe('When publish successfully event', () => {
it('should call the handler', async () => {
const event = new FooEvent({ foo: 'foo' });
await rabbitEventBus.publish(event);

await waitFor(() => expect(handlerMock).toBeCalledWith(event));
});

it('should persist the event with published status', async () => {
await rabbitEventBus.publish(new FooEvent({ foo: 'foo' }));

expect(await mongoBusPersistence.getEventStatus(new FooEvent({ foo: 'foo' }))).toBe(
'published',
);
});
});

describe('When publish event fail', () => {
it('should persist the event with pending status', async () => {
await rabbitEventBus.terminate();
try {
await rabbitEventBus.publish(new FooEvent({ foo: 'foo' }));
} catch (e) {}

expect(await mongoBusPersistence.getEventStatus(new FooEvent({ foo: 'foo' }))).toBe('pending');

await rabbitEventBus.init();
});
});
});

Expand Down Expand Up @@ -128,6 +167,38 @@ describe('RabbitEventBus', () => {
});
});
});

describe('Given some pending events', () => {
describe('When publishAllPendingEvents', () => {
it('should publish all pending events', async () => {
const fooHandlerMock = jest.fn();
const barHandlerMock = jest.fn();

class FooEventHandler implements IEventHandler<FooEvent> {
async handle(event: FooEvent) {
fooHandlerMock(event);
}
}

class BarEventHandler implements IEventHandler<BarEvent> {
async handle(event: BarEvent) {
barHandlerMock(event);
}
}

await rabbitEventBus.subscribe(FooEvent, new FooEventHandler());
await rabbitEventBus.subscribe(BarEvent, new BarEventHandler());

await mongoBusPersistence.persistEvent(new FooEvent({ foo: 'foo' }), 'pending');
await mongoBusPersistence.persistEvent(new BarEvent({ bar: 'bar' }), 'pending');

await rabbitEventBus.publishAllPendingEvents();

await waitFor(() => expect(fooHandlerMock).toBeCalled());
await waitFor(() => expect(barHandlerMock).toBeCalled());
});
});
});
});
});
});
Expand Down
27 changes: 23 additions & 4 deletions packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
IEventHandler,
ILogger,
IRetryMechanism,
} from '@fizzbuds/ddd-toolkit';
} from '../../ddd-tookit/src';

import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib';
import { inspect } from 'util';
import { IBusPersistence } from './mongo-bus-persistence';

export class RabbitEventBus implements IEventBus {
private amqpConnection: Connection;
Expand All @@ -26,6 +27,7 @@ export class RabbitEventBus implements IEventBus {
private readonly maxAttempts: number = 3,
private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000),
private readonly logger: ILogger,
private readonly busPersistence: IBusPersistence,
) {}

public async init(): Promise<void> {
Expand All @@ -49,11 +51,13 @@ export class RabbitEventBus implements IEventBus {
this.handlers[event.name] = handler;
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
public async publish<T extends IEvent<unknown>>(event: T, persistenceSession?: unknown): Promise<void> {
const message = this.eventToRabbitMessage(event);
await this.busPersistence.persistEvent(event, 'pending', persistenceSession);
this.producerChannel.publish(this.exchangeName, event.name, message);
await this.producerChannel.waitForConfirms();

await this.busPersistence.persistEvent(event, 'published', persistenceSession);
}

public async terminate(): Promise<void> {
Expand All @@ -62,6 +66,21 @@ export class RabbitEventBus implements IEventBus {
await this.amqpConnection.close();
}

async publishAllPendingEvents() {
for (const event of await this.busPersistence.getPendingEvents()) {
const message = this.eventToRabbitMessage(event);
this.producerChannel.publish(this.exchangeName, event.name, message);
await this.producerChannel.waitForConfirms();

await this.busPersistence.persistEvent(event, 'published');
}
}

private eventToRabbitMessage(event: IEvent<unknown>) {
const serializedEvent = JSON.stringify(event);
return Buffer.from(serializedEvent);
}

private async onMessage(rawMessage: ConsumeMessage | null) {
if (rawMessage === null) return;
const parsedMessage = JSON.parse(rawMessage.content.toString());
Expand Down
2 changes: 1 addition & 1 deletion packages/ddd-tookit/src/event-bus/event-bus.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ export interface IEventHandler<E extends IEvent<unknown>> {
export interface IEventBus {
subscribe<E extends IEvent<unknown>>(event: IEventClass<E>, handler: IEventHandler<E>): void;

publish<E extends IEvent<unknown>>(event: E): Promise<void>;
publish<E extends IEvent<unknown>>(event: E, session?: unknown): Promise<void>;
}
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0dce091

Please sign in to comment.