- Section 16: Managing a NATS Client
- Table of Contents
- Publishing Ticket Creation
- More on Publishing
- NATS Client Singleton
- Remember Mongoose?
- Singleton Implementation
- Accessing the NATS Client
- Graceful Shutdown
- Successful Listen!
- Ticket Update Publishing
- Failed Event Publishing
- Handling Publish Failures
- Fixing a Few Tests
- Redirecting Imports
- Providing a Mock Implementation
- Test-Suite Wide Mocks
- Ensuring Mock Invocations
- NATS Env Variables
// ticket-created-publisher.ts
import { Publisher, Subjects, TicketCreatedEvent } from '@chticketing/common';
export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
subject: Subjects.TicketCreated = Subjects.TicketCreated;
}
// new.ts
await new TicketCreatedPublisher(client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
});
Cyclic Dependency Issue
// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';
class NatsWrapper {}
export const natsWrapper = new NatsWrapper();
// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';
class NatsWrapper {
private _client?: Stan;
connect(clusterId: string, clientId: string, url: string) {
this._client = nats.connect(clusterId, clientId, { url });
return new Promise((resolve, reject) => {
this._client!.on('connect', () => {
console.log('Connected to NATS');
resolve();
});
this._client!.on('error', (err) => {
reject(err);
});
});
}
}
export const natsWrapper = new NatsWrapper();
try {
await natsWrapper.connect(
'ticketing',
'dhghad',
'http://nats-srv:4222'
);
...
}
// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';
class NatsWrapper {
private _client?: Stan;
get client() {
if (!this._client) {
throw new Error('Cannot access NAT client before connecting')
}
return this._client;
}
connect(clusterId: string, clientId: string, url: string) {
this._client = nats.connect(clusterId, clientId, { url });
return new Promise((resolve, reject) => {
this.client.on('connect', () => {
console.log('Connected to NATS');
resolve();
});
this.client.on('error', (err) => {
reject(err);
});
});
}
}
export const natsWrapper = new NatsWrapper();
// new.ts
import { natsWrapper } from'../nats-wrapper';
await new TicketCreatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
});
// index.ts
natsWrapper.client.on('close', () => {
console.log('NATS connection closed!');
process.exit();
});
process.on('SIGINT', () => natsWrapper.client.close());
process.on('SIGTERM', () => natsWrapper.client.close());
kubectl get pods
kubectl delete pod nats-depl-8658cfccf-r9bt8
- Connected to NATS
skaffold dev
kubectl get pods
kubectl port-forward nats-depl-8658cfccf-jnb7b 4222:4222
- Listener connected to NATS
cd section-16/ticketing/nats-test
npm run listen
- Create Ticket with Postman
- Check listener receive the ticket created
import { Publisher, Subjects, TicketUpdatedEvent } from '@chticketing/common';
export class TicketUpdatedPublisher extends Publisher<TicketUpdatedEvent> {
subject: Subjects.TicketUpdated = Subjects.TicketUpdated;
}
new TicketUpdatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId
});
Mocking (Faking) Imports with Jest
- Find the file that we want to 'fake'
- In the same directory, create a folder called 'mocks'
- In that folder, create a file with an identical name to the file we want to fake
- Write a fake implementation
- Tell jest to use that fake file in our test file
jest.mock('../../nats-wrapper');
export const natsWrapper = {
client: {
publish: (subject: string, data: string, callback: () => void) => {
callback();
},
},
};
// new.test.ts
jest.mock('../../nats-wrapper');
// setuo.ts
jest.mock('../nats-wrapper');
export const natsWrapper = {
client: {
publish: jest.fn().mockImplementation(
(subject: string, data: string, callback: () => void) => {
callback();
})
},
};
- name: NATS_CLUSTER_ID
value: 'ticketing'
- name: NATS_CLIENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NATS_URL
value: 'http://nats-srv:4222'
await natsWrapper.connect(
process.env.NATS_CLUSTER_ID,
process.env.NATS_CLIENT_ID,
process.env.NATS_URL
);