Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto bind event exchanges to endpoints #6

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spec/messageType-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,16 @@ describe("messageType", () => {

expect(messageType.toString()).toBe("urn:message:Contracts:SubmitOrder")
})

it("toExchange should return namespace with name without protocol", () => {
const messageType = new MessageType("SubmitOrder")

expect(messageType.toExchange()).toBe("Messages:SubmitOrder")
})

it("toExchange should support custom namespace", () => {
const messageType = new MessageType("SubmitOrder", "Contracts")

expect(messageType.toExchange()).toBe("Contracts:SubmitOrder")
})
})
131 changes: 131 additions & 0 deletions spec/receiveEndpoint-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { defaultReceiveEndpointOptions, ReceiveEndpoint, ReceiveEndpointOptions } from '../dist/receiveEndpoint';
import { RabbitMqHostAddress } from '../src/RabbitMqEndpointAddress';
import { Bus } from '../dist/bus';
import { ConfirmChannel } from 'amqplib';
import { ChannelContext } from '../dist/channelContext';
import { MessageType } from '../dist/messageType';

describe("ReceiveEndpoint", () => {
let bus: jasmine.SpyObj<Bus>
let context: jasmine.SpyObj<ChannelContext>
let channel: jasmine.SpyObj<ConfirmChannel>

let receiveEndpoint: ReceiveEndpoint

beforeEach(() => {
bus = jasmine.createSpyObj('Bus', ['on'], {
hostAddress: new RabbitMqHostAddress({ host: '', virtualHost: '' })
});
const connection = jasmine.createSpyObj('Connection', ['createConfirmChannel']);
channel = jasmine.createSpyObj('ConfirmChannel', [
'on',
'prefetch',
'consume',
'assertExchange',
'assertQueue',
'bindQueue',
'bindExchange'
])
channel.assertQueue.and.returnValue({queue: '', messageCount: 0, consumerCount: 0} as any)
channel.consume.and.returnValue({ consumerTag: '' } as any)
context = jasmine.createSpyObj('ChannelContext', [], {
channel
})
connection.createConfirmChannel.and.returnValue(channel)
})

it("should configure default prefetch on connect", async () => {
createReceiveEndpoint()

await callOnChannel()

expect(channel.prefetch).toHaveBeenCalledWith(defaultReceiveEndpointOptions.prefetchCount, defaultReceiveEndpointOptions.globalPrefetch)
})

describe("should configure prefetch on connect", () => {
const testCases: [number, boolean][] = [
[1, false],
[1, true],
[5, true]
]
for (const [prefetchCount, globalPrefetch] of testCases) {
it(`(${prefetchCount}, ${globalPrefetch})`, async () => {
const options = {
...defaultReceiveEndpointOptions,
prefetchCount,
globalPrefetch
}
createReceiveEndpoint('', options)

await callOnChannel()

expect(channel.prefetch).toHaveBeenCalledWith(prefetchCount, globalPrefetch)
})
}
})

describe("should assertExchange on connect", () => {
const testCases: string[] = ["queue name", "another queue"]
for (const queueName of testCases) {
it(`(${queueName})`, async () => {
createReceiveEndpoint(queueName)

await callOnChannel()

expect(channel.assertExchange).toHaveBeenCalledWith(queueName, 'fanout', defaultReceiveEndpointOptions)
})
}
})

describe("should assertQueue on connect", () => {
const testCases: string[] = ["queue name", "another queue"]
for (const queueName of testCases) {
it(`(${queueName})`, async () => {
createReceiveEndpoint(queueName)

await callOnChannel()

expect(channel.assertQueue).toHaveBeenCalledWith(queueName, defaultReceiveEndpointOptions)
})
}
})

describe("should bindQueue on connect", () => {
const testCases: string[] = ["queue name", "another queue"]
for (const queueName of testCases) {
it(`(${queueName})`, async () => {
createReceiveEndpoint(queueName)

await callOnChannel()

expect(channel.bindQueue).toHaveBeenCalledWith(queueName, queueName, '')
})
}
})

describe("should bind exchanges for every event bound via handle", () => {
const testCases: [string, MessageType][] = [
["queue name", new MessageType("Event")],
["another queue", new MessageType("Message", "Contract")]
]
for (const [queueName, messageType] of testCases) {
it(`(${queueName})`, async () => {
createReceiveEndpoint(queueName)
receiveEndpoint.handle(messageType, jasmine.createSpy())

await callOnChannel()

expect(channel.bindExchange).toHaveBeenCalledWith(queueName, messageType.toExchange(), '')
})
}
})

async function callOnChannel(): Promise<void> {
// TODO: this is a hack so we can actually test the channel setup
await (receiveEndpoint as any).onChannel(context);
}

function createReceiveEndpoint(queueName: string = '', options?: ReceiveEndpointOptions) {
receiveEndpoint = new ReceiveEndpoint(bus, queueName, undefined, options)
}
})
84 changes: 84 additions & 0 deletions spec/transport-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Transport } from '../dist/transport';
import { Bus } from '../dist/bus';
import { ConnectionContext } from '../dist/connectionContext';
import { ConfirmChannel } from 'amqplib';
import { SendContext } from '../dist/sendContext';
import { RabbitMqHostAddress } from '../src/RabbitMqEndpointAddress';

describe("Transport", () => {
let bus: jasmine.SpyObj<Bus>
let context: jasmine.SpyObj<ConnectionContext>
let channel: jasmine.SpyObj<ConfirmChannel>

let transport: Transport

beforeEach(() => {
bus = jasmine.createSpyObj('Bus', ['on'], {
hostAddress: new RabbitMqHostAddress({ host: '', virtualHost: '' })
});
const connection = jasmine.createSpyObj('Connection', ['createConfirmChannel']);
context = jasmine.createSpyObj('ConnectionContext', [], {
connection
})
channel = jasmine.createSpyObj('ConfirmChannel', ['publish', 'on'])
connection.createConfirmChannel.and.returnValue(channel)

transport = new Transport(bus)
})

describe("send should publish the given message", () => {
const testCases = [
['exchange', 'routingKey'],
['MessageExchange', 'RoutingKey']
]
for (const [exchange, routingKey] of testCases) {
it(`(${exchange}, ${routingKey})`, async () => {
callPublishCallback()
await callOnConnect()

await transport.send(exchange, routingKey, new SendContext({}))

expect(channel.publish).toHaveBeenCalledWith(exchange, routingKey, jasmine.any(Buffer), jasmine.any(Object), jasmine.any(Function))
})
}
})

it("send should set the contentType", async () => {
callPublishCallback()
await callOnConnect()

await transport.send('', '', new SendContext({}))

expect(channel.publish).toHaveBeenCalledWith(jasmine.any(String), jasmine.any(String), jasmine.any(Buffer), {
persistent: true,
contentType: "application/vnd.masstransit+json"
}, jasmine.any(Function))
})

it("publishing after onConnect should set the contentType", async () => {
callPublishCallback()
transport.send('', '', new SendContext({}))

await callOnConnect()

expect(channel.publish).toHaveBeenCalledWith(jasmine.any(String), jasmine.any(String), jasmine.any(Buffer), {
persistent: true,
contentType: "application/vnd.masstransit+json"
}, jasmine.any(Function))
})

function callPublishCallback() {
channel.publish.and.callFake((exchange, routingKey, content, options, callback) => {
if (callback != null) {
callback(null, {});
}
return true;
})
}

function callOnConnect() {
const callback = bus.on.calls.first().args[1];

return callback(context);
}
})
6 changes: 3 additions & 3 deletions src/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class MassTransitBus extends EventEmitter implements Bus {

console.log('Bus stopped', this.hostAddress.toString());
}
catch (e) {
catch (e: any) {
console.error('failed to close bus', e.message);
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ class MassTransitBus extends EventEmitter implements Bus {
console.log('Connecting', this.hostAddress.toString());

try {
this.connection = connect(this.hostAddress + '?heartbeat=60');
this.connection = connect(this.hostAddress + '?heartbeat=60') as unknown as Promise<Connection>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does casting it to unknown then Promise help? Why the as unknown at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bluebird promise type seems to be incompatible with the native Promise type which is circumvented by the unknown cast.
When I cast directly to Promise I receive the following error:

TS2352: Conversion of type 'Bluebird' to type 'Promise' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.   The types returned by 'then(...)' are incompatible between these types.     Property '[Symbol.toStringTag]' is missing in type 'Bluebird' but required in type 'Promise'.

If you have a better idea I'll gladly change it.


let connection = await this.connection;

Expand All @@ -154,7 +154,7 @@ class MassTransitBus extends EventEmitter implements Bus {

this.emit('connect', {hostAddress: this.hostAddress, connection: connection});
}
catch (e) {
catch (e: any) {
console.error('Connect failed', e.message);

this.scheduleReconnect();
Expand Down
4 changes: 4 additions & 0 deletions src/messageType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ export class MessageType {
toMessageType(): Array<string> {
return [this.toString()];
}

toExchange(): string {
return `${this.ns}:${this.name}`
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This we can get a test on pretty easily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

6 changes: 6 additions & 0 deletions src/receiveEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ export class ReceiveEndpoint extends Transport implements ReceiveEndpointConfigu
let deserializer = new MessageTypeDeserializer<T>(this);
this._messageTypes[typeName] = deserializer;
deserializer.on(listener);
this.boundEvents.push(messageType);
}

return this;
}

private readonly _messageTypes: MessageMap;
private readonly boundEvents: MessageType[] = [];

constructor(bus: Bus, queueName: string, cb?: (cfg: ReceiveEndpointConfigurator) => void, options: ReceiveEndpointOptions = defaultReceiveEndpointOptions) {
super(bus);
Expand Down Expand Up @@ -119,6 +121,10 @@ export class ReceiveEndpoint extends Transport implements ReceiveEndpointConfigu

await channel.bindQueue(this.queueName, this.queueName, '');

for (const messageType of this.boundEvents) {
await channel.bindExchange(this.queueName, messageType.toExchange(), '');
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get a test on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for the configureTopology method.

console.log('Queue:', queue.queue, 'MessageCount:', queue.messageCount, 'ConsumerCount:', queue.consumerCount);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class Transport extends EventEmitter implements Transport {
let pendingPublish = this.pendingPublishQueue.shift();
if (!pendingPublish) break;

// TODO: resolve or reject pending publishes
let {exchange, message, routingKey} = pendingPublish;

await this.basicPublish(exchange, routingKey, message);
Expand All @@ -127,7 +128,7 @@ export class Transport extends EventEmitter implements Transport {
if (this.channel) {
let channel = this.channel;
return new Promise((resolve, reject) => {
const result = channel.publish(exchange, routingKey, body, {persistent: true},
const result = channel.publish(exchange, routingKey, body, {persistent: true, contentType: "application/vnd.masstransit+json"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a unit test on this? I know that the unit test coverage is low for this project but any help there would be appreciated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some tests for the publish mechanism as well as the delayed publishing when sending messages before the channel connection was established.

err => {
if (err) {
reject(err);
Expand Down