diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index e8f5eb1..b72c08c 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable } from "@nestjs/common"; +import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,9 +37,14 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer extends Server implements CustomTransportStrategy { +export class KafkaConsumer + extends Server + implements CustomTransportStrategy, OnApplicationBootstrap +{ public readonly transportId = KafkaConsumerTransportId; + private applicationBootstrapped = false; + private readonly messageHandlersMap: Map< string, Map @@ -62,9 +67,24 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } public listen(callback: (error?: unknown) => void): void { + if (!this.applicationBootstrapped) { + const errorMessage = + "Application is not bootstrapped. " + + "Ensure that you have called app.listen() " + + "before calling app.startAllMicroservices(). " + + "The listen method was invoked before the application finished bootstrapping."; + + this.logger.error(errorMessage); + return callback(new Error(errorMessage)); + } + this.start().then(callback).catch(callback); } + public onApplicationBootstrap(): void { + this.applicationBootstrapped = true; + } + private addMessageHandlerToMap( connectionName: string, topics: string[],