Skip to content

Commit

Permalink
fix: use onApplicationShutdown to disconnect kafka producers
Browse files Browse the repository at this point in the history
and throw an error when started listening before app is boostrapped
  • Loading branch information
vglebovich-lmru authored Sep 4, 2024
1 parent cd7bdea commit bb1491d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ async function bootstrap(): Promise<void> {
strategy: app.get(KafkaRetryConsumer),
});

// Put `app.listen(...)` before `app.startAllMicroservice()`
await app.listen(...)

await app.startAllMicroservices();

// ...
Expand All @@ -338,6 +341,9 @@ async function bootstrap(): Promise<void> {

### Consuming Messages

> [!IMPORTANT]
> Put `app.startAllMicroservices()` after your `app.listen(...)`
</details>

<details>
Expand Down
24 changes: 22 additions & 2 deletions src/consumer/kafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import vm from "vm";

import { Inject, Injectable } from "@nestjs/common";
import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common";
import {
CustomTransportStrategy,
MessageHandler,
Expand All @@ -37,9 +37,14 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces";
import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler";

@Injectable()
export class KafkaConsumer extends Server implements CustomTransportStrategy {
export class KafkaConsumer
extends Server
implements CustomTransportStrategy, OnApplicationBootstrap
{
public readonly transportId = KafkaConsumerTransportId;

private applicationBootstrapped = false;

private readonly messageHandlersMap: Map<
string,
Map<string, MessageHandler>
Expand All @@ -62,9 +67,24 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy {
}

public listen(callback: (error?: unknown) => void): void {
if (!this.applicationBootstrapped) {
const errorMessage =
"Application is not bootstrapped. " +
"Ensure that you have called app.listen() " +
"before calling app.startAllMicroservices(). " +
"The listen method was invoked before the application finished bootstrapping.";

this.logger.error(errorMessage);
return callback(new Error(errorMessage));
}

this.start().then(callback).catch(callback);
}

public onApplicationBootstrap(): void {
this.applicationBootstrapped = true;
}

private addMessageHandlerToMap(
connectionName: string,
topics: string[],
Expand Down
6 changes: 3 additions & 3 deletions src/producer/kafkaCoreProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
import {
Inject,
Injectable,
OnModuleDestroy,
OnApplicationShutdown,
OnModuleInit,
} from "@nestjs/common";
import { Producer } from "kafkajs";

import { ProducersMapToken } from "../consts";

@Injectable()
export class KafkaCoreProducer implements OnModuleInit, OnModuleDestroy {
export class KafkaCoreProducer implements OnModuleInit, OnApplicationShutdown {
public constructor(
@Inject(ProducersMapToken)
private readonly producersMap: Map<string, Producer>,
) {}

public async onModuleDestroy(): Promise<void> {
public async onApplicationShutdown(): Promise<void> {
await Promise.all(
[...this.producersMap.values()].map((x) => x.disconnect()),
);
Expand Down

0 comments on commit bb1491d

Please sign in to comment.