Skip to content

Latest commit

 

History

History
339 lines (258 loc) · 7.49 KB

section-16.md

File metadata and controls

339 lines (258 loc) · 7.49 KB

Section 16: Managing a NATS Client

Table of Contents

Publishing Ticket Creation

// ticket-created-publisher.ts
import { Publisher, Subjects, TicketCreatedEvent } from '@chticketing/common';

export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
  subject: Subjects.TicketCreated = Subjects.TicketCreated;
}

⬆ back to top

More on Publishing

// new.ts
await new TicketCreatedPublisher(client).publish({
  id: ticket.id,
  title: ticket.title,
  price: ticket.price,
  userId: ticket.userId,
});

⬆ back to top

NATS Client Singleton

Cyclic Dependency Issue

⬆ back to top

Remember Mongoose?

// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';

class NatsWrapper {}

export const natsWrapper = new NatsWrapper();

⬆ back to top

Singleton Implementation

// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';

class NatsWrapper {
  private _client?: Stan;

  connect(clusterId: string, clientId: string, url: string) {
    this._client = nats.connect(clusterId, clientId, { url });

    return new Promise((resolve, reject) => {
      this._client!.on('connect', () => {
        console.log('Connected to NATS');
        resolve();
      });
      this._client!.on('error', (err) => {
        reject(err);
      });
    });
  }
}

export const natsWrapper = new NatsWrapper();
  try {
    await natsWrapper.connect(
      'ticketing', 
      'dhghad', 
      'http://nats-srv:4222'
    );
  ...
  }

⬆ back to top

Accessing the NATS Client

// nats-wrapper.ts
import nats, { Stan } from 'node-nats-streaming';

class NatsWrapper {
  private _client?: Stan;

  get client() {
    if (!this._client) {
      throw new Error('Cannot access NAT client before connecting')
    }

    return this._client;
  }

  connect(clusterId: string, clientId: string, url: string) {
    this._client = nats.connect(clusterId, clientId, { url });

    return new Promise((resolve, reject) => {
      this.client.on('connect', () => {
        console.log('Connected to NATS');
        resolve();
      });
      this.client.on('error', (err) => {
        reject(err);
      });
    });
  }
}

export const natsWrapper = new NatsWrapper();
// new.ts
import { natsWrapper } from'../nats-wrapper';

  await new TicketCreatedPublisher(natsWrapper.client).publish({
    id: ticket.id,
    title: ticket.title,
    price: ticket.price,
    userId: ticket.userId,
  });

⬆ back to top

Graceful Shutdown

// index.ts
  natsWrapper.client.on('close', () => {
    console.log('NATS connection closed!');
    process.exit();
  });
  process.on('SIGINT', () => natsWrapper.client.close());
  process.on('SIGTERM', () => natsWrapper.client.close());
kubectl get pods
kubectl delete pod nats-depl-8658cfccf-r9bt8

⬆ back to top

Successful Listen!

  • Connected to NATS
skaffold dev
kubectl get pods
kubectl port-forward nats-depl-8658cfccf-jnb7b 4222:4222
  • Listener connected to NATS
cd section-16/ticketing/nats-test
npm run listen
  • Create Ticket with Postman
  • Check listener receive the ticket created

⬆ back to top

Ticket Update Publishing

import { Publisher, Subjects, TicketUpdatedEvent } from '@chticketing/common';

export class TicketUpdatedPublisher extends Publisher<TicketUpdatedEvent> {
  subject: Subjects.TicketUpdated = Subjects.TicketUpdated;
}
  new TicketUpdatedPublisher(natsWrapper.client).publish({
    id: ticket.id,
    title: ticket.title,
    price: ticket.price,
    userId: ticket.userId
  });

⬆ back to top

Failed Event Publishing

⬆ back to top

Handling Publish Failures

⬆ back to top

Fixing a Few Tests

⬆ back to top

Redirecting Imports

Mocking (Faking) Imports with Jest

  • Find the file that we want to 'fake'
  • In the same directory, create a folder called 'mocks'
  • In that folder, create a file with an identical name to the file we want to fake
  • Write a fake implementation
  • Tell jest to use that fake file in our test file
jest.mock('../../nats-wrapper');

⬆ back to top

Providing a Mock Implementation

export const natsWrapper = {
  client: {
    publish: (subject: string, data: string, callback: () => void) => {
      callback();
    },
  },
};
// new.test.ts
jest.mock('../../nats-wrapper');

⬆ back to top

Test-Suite Wide Mocks

// setuo.ts
jest.mock('../nats-wrapper');

⬆ back to top

Ensuring Mock Invocations

export const natsWrapper = {
  client: {
    publish: jest.fn().mockImplementation(
      (subject: string, data: string, callback: () => void) => {
        callback();
    })
  },
};

⬆ back to top

NATS Env Variables

  - name: NATS_CLUSTER_ID
    value: 'ticketing'
  - name: NATS_CLIENT_ID
    valueFrom:
      fieldRef:
        fieldPath: metadata.name
  - name: NATS_URL
    value: 'http://nats-srv:4222'
  await natsWrapper.connect(
    process.env.NATS_CLUSTER_ID, 
    process.env.NATS_CLIENT_ID, 
    process.env.NATS_URL
  );

⬆ back to top