diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index e8f5eb1..3814430 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable } from "@nestjs/common"; +import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,7 +37,10 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer extends Server implements CustomTransportStrategy { +export class KafkaConsumer + extends Server + implements CustomTransportStrategy, OnApplicationBootstrap +{ public readonly transportId = KafkaConsumerTransportId; private readonly messageHandlersMap: Map< @@ -54,7 +57,6 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { ) { super(); } - public async close(): Promise { await Promise.all( [...this.consumersMap.values()].map((x) => x.disconnect()), @@ -62,7 +64,11 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } public listen(callback: (error?: unknown) => void): void { - this.start().then(callback).catch(callback); + this.connectAndSubscribe().then(callback).catch(callback); + } + + public async onApplicationBootstrap(): Promise { + await this.startProcessingMessages(); } private addMessageHandlerToMap( @@ -81,6 +87,12 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } } + private async connectAndSubscribe(): Promise { + await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); + + await this.subscribe(); + } + private getConsumer(connectionName: string): Consumer { const consumer = this.consumersMap.get(connectionName); @@ -93,7 +105,7 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { return consumer; } - private async run(): Promise { + private async startProcessingMessages(): Promise { for (const [connectionName, consumer] of this.consumersMap) { await consumer.run({ eachMessage: async (rawPayload) => { @@ -111,13 +123,6 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } } - private async start(): Promise { - await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); - - await this.subscribe(); - await this.run(); - } - private async subscribe(): Promise { const context = vm.createContext({ topicPickerArgs: this.kafkaOptions.topicPickerArgs,