diff --git a/README.md b/README.md index 9d84b42..23f443d 100644 --- a/README.md +++ b/README.md @@ -326,6 +326,9 @@ async function bootstrap(): Promise { strategy: app.get(KafkaRetryConsumer), }); + // Put `app.listen(...)` before `app.startAllMicroservice()` + await app.listen(...) + await app.startAllMicroservices(); // ... @@ -338,6 +341,9 @@ async function bootstrap(): Promise { ### Consuming Messages +> [!IMPORTANT] +> Put `app.startAllMicroservices()` after your `app.listen(...)` +
diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index e8f5eb1..b72c08c 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable } from "@nestjs/common"; +import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,9 +37,14 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer extends Server implements CustomTransportStrategy { +export class KafkaConsumer + extends Server + implements CustomTransportStrategy, OnApplicationBootstrap +{ public readonly transportId = KafkaConsumerTransportId; + private applicationBootstrapped = false; + private readonly messageHandlersMap: Map< string, Map @@ -62,9 +67,24 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } public listen(callback: (error?: unknown) => void): void { + if (!this.applicationBootstrapped) { + const errorMessage = + "Application is not bootstrapped. " + + "Ensure that you have called app.listen() " + + "before calling app.startAllMicroservices(). " + + "The listen method was invoked before the application finished bootstrapping."; + + this.logger.error(errorMessage); + return callback(new Error(errorMessage)); + } + this.start().then(callback).catch(callback); } + public onApplicationBootstrap(): void { + this.applicationBootstrapped = true; + } + private addMessageHandlerToMap( connectionName: string, topics: string[], diff --git a/src/producer/kafkaCoreProducer.ts b/src/producer/kafkaCoreProducer.ts index a6bbb65..8c15aac 100644 --- a/src/producer/kafkaCoreProducer.ts +++ b/src/producer/kafkaCoreProducer.ts @@ -17,7 +17,7 @@ import { Inject, Injectable, - OnModuleDestroy, + OnApplicationShutdown, OnModuleInit, } from "@nestjs/common"; import { Producer } from "kafkajs"; @@ -25,13 +25,13 @@ import { Producer } from "kafkajs"; import { ProducersMapToken } from "../consts"; @Injectable() -export class KafkaCoreProducer implements OnModuleInit, OnModuleDestroy { +export class KafkaCoreProducer implements OnModuleInit, OnApplicationShutdown { public constructor( @Inject(ProducersMapToken) private readonly producersMap: Map, ) {} - public async onModuleDestroy(): Promise { + public async onApplicationShutdown(): Promise { await Promise.all( [...this.producersMap.values()].map((x) => x.disconnect()), );