From 19b393a293519a0378c8f74d5d3e6b7addee07d5 Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Mon, 2 Sep 2024 02:04:15 +0000 Subject: [PATCH 1/6] fix: use `onApplicationShutdown` to disconnect kafka producers --- src/producer/kafkaCoreProducer.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/producer/kafkaCoreProducer.ts b/src/producer/kafkaCoreProducer.ts index a6bbb65..8c15aac 100644 --- a/src/producer/kafkaCoreProducer.ts +++ b/src/producer/kafkaCoreProducer.ts @@ -17,7 +17,7 @@ import { Inject, Injectable, - OnModuleDestroy, + OnApplicationShutdown, OnModuleInit, } from "@nestjs/common"; import { Producer } from "kafkajs"; @@ -25,13 +25,13 @@ import { Producer } from "kafkajs"; import { ProducersMapToken } from "../consts"; @Injectable() -export class KafkaCoreProducer implements OnModuleInit, OnModuleDestroy { +export class KafkaCoreProducer implements OnModuleInit, OnApplicationShutdown { public constructor( @Inject(ProducersMapToken) private readonly producersMap: Map, ) {} - public async onModuleDestroy(): Promise { + public async onApplicationShutdown(): Promise { await Promise.all( [...this.producersMap.values()].map((x) => x.disconnect()), ); From b4b890c7557fcc0aec8f6293c5a3976995791d15 Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Mon, 2 Sep 2024 19:26:34 +0000 Subject: [PATCH 2/6] fix: use `onApplicationBootstrap` to start processing messages --- src/consumer/kafkaConsumer.ts | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index e8f5eb1..3814430 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable } from "@nestjs/common"; +import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,7 +37,10 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer extends Server implements CustomTransportStrategy { +export class KafkaConsumer + extends Server + implements CustomTransportStrategy, OnApplicationBootstrap +{ public readonly transportId = KafkaConsumerTransportId; private readonly messageHandlersMap: Map< @@ -54,7 +57,6 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { ) { super(); } - public async close(): Promise { await Promise.all( [...this.consumersMap.values()].map((x) => x.disconnect()), @@ -62,7 +64,11 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } public listen(callback: (error?: unknown) => void): void { - this.start().then(callback).catch(callback); + this.connectAndSubscribe().then(callback).catch(callback); + } + + public async onApplicationBootstrap(): Promise { + await this.startProcessingMessages(); } private addMessageHandlerToMap( @@ -81,6 +87,12 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } } + private async connectAndSubscribe(): Promise { + await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); + + await this.subscribe(); + } + private getConsumer(connectionName: string): Consumer { const consumer = this.consumersMap.get(connectionName); @@ -93,7 +105,7 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { return consumer; } - private async run(): Promise { + private async startProcessingMessages(): Promise { for (const [connectionName, consumer] of this.consumersMap) { await consumer.run({ eachMessage: async (rawPayload) => { @@ -111,13 +123,6 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } } - private async start(): Promise { - await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); - - await this.subscribe(); - await this.run(); - } - private async subscribe(): Promise { const context = vm.createContext({ topicPickerArgs: this.kafkaOptions.topicPickerArgs, From f626db84871664c85b9c24587344de61e0bc71f7 Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Tue, 3 Sep 2024 09:04:00 +0000 Subject: [PATCH 3/6] Revert "fix: use `onApplicationBootstrap` to start processing messages" This reverts commit b4b890c7557fcc0aec8f6293c5a3976995791d15. --- src/consumer/kafkaConsumer.ts | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index 3814430..e8f5eb1 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; +import { Inject, Injectable } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,10 +37,7 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer - extends Server - implements CustomTransportStrategy, OnApplicationBootstrap -{ +export class KafkaConsumer extends Server implements CustomTransportStrategy { public readonly transportId = KafkaConsumerTransportId; private readonly messageHandlersMap: Map< @@ -57,6 +54,7 @@ export class KafkaConsumer ) { super(); } + public async close(): Promise { await Promise.all( [...this.consumersMap.values()].map((x) => x.disconnect()), @@ -64,11 +62,7 @@ export class KafkaConsumer } public listen(callback: (error?: unknown) => void): void { - this.connectAndSubscribe().then(callback).catch(callback); - } - - public async onApplicationBootstrap(): Promise { - await this.startProcessingMessages(); + this.start().then(callback).catch(callback); } private addMessageHandlerToMap( @@ -87,12 +81,6 @@ export class KafkaConsumer } } - private async connectAndSubscribe(): Promise { - await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); - - await this.subscribe(); - } - private getConsumer(connectionName: string): Consumer { const consumer = this.consumersMap.get(connectionName); @@ -105,7 +93,7 @@ export class KafkaConsumer return consumer; } - private async startProcessingMessages(): Promise { + private async run(): Promise { for (const [connectionName, consumer] of this.consumersMap) { await consumer.run({ eachMessage: async (rawPayload) => { @@ -123,6 +111,13 @@ export class KafkaConsumer } } + private async start(): Promise { + await Promise.all([...this.consumersMap.values()].map((x) => x.connect())); + + await this.subscribe(); + await this.run(); + } + private async subscribe(): Promise { const context = vm.createContext({ topicPickerArgs: this.kafkaOptions.topicPickerArgs, From f468532764d2fceae440dc94ec1658081cf1413c Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Tue, 3 Sep 2024 21:32:16 +0000 Subject: [PATCH 4/6] fix: throw an error when started listening before app is boostrapped --- src/consumer/kafkaConsumer.ts | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/consumer/kafkaConsumer.ts b/src/consumer/kafkaConsumer.ts index e8f5eb1..b72c08c 100644 --- a/src/consumer/kafkaConsumer.ts +++ b/src/consumer/kafkaConsumer.ts @@ -18,7 +18,7 @@ import vm from "vm"; -import { Inject, Injectable } from "@nestjs/common"; +import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common"; import { CustomTransportStrategy, MessageHandler, @@ -37,9 +37,14 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces"; import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler"; @Injectable() -export class KafkaConsumer extends Server implements CustomTransportStrategy { +export class KafkaConsumer + extends Server + implements CustomTransportStrategy, OnApplicationBootstrap +{ public readonly transportId = KafkaConsumerTransportId; + private applicationBootstrapped = false; + private readonly messageHandlersMap: Map< string, Map @@ -62,9 +67,24 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy { } public listen(callback: (error?: unknown) => void): void { + if (!this.applicationBootstrapped) { + const errorMessage = + "Application is not bootstrapped. " + + "Ensure that you have called app.listen() " + + "before calling app.startAllMicroservices(). " + + "The listen method was invoked before the application finished bootstrapping."; + + this.logger.error(errorMessage); + return callback(new Error(errorMessage)); + } + this.start().then(callback).catch(callback); } + public onApplicationBootstrap(): void { + this.applicationBootstrapped = true; + } + private addMessageHandlerToMap( connectionName: string, topics: string[], From 7aeef8dada88012030e74d763e4e5a66b8aeef00 Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Tue, 3 Sep 2024 21:50:20 +0000 Subject: [PATCH 5/6] docs: notice about the right order of `startAllMicroservices` and `listen` --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 9d84b42..57398ae 100644 --- a/README.md +++ b/README.md @@ -329,6 +329,8 @@ async function bootstrap(): Promise { await app.startAllMicroservices(); // ... + + await app.listen(...) } // ... @@ -338,6 +340,9 @@ async function bootstrap(): Promise { ### Consuming Messages +> [!IMPORTANT] +> Put `app.startAllMicroservices()` after your `app.listen(...)` +
From 726c1c1741b2718307ce6a9aeacd0efad4d014e3 Mon Sep 17 00:00:00 2001 From: vglebovich-lmru <173266533+vglebovich-lmru@users.noreply.github.com> Date: Wed, 4 Sep 2024 12:00:22 +0000 Subject: [PATCH 6/6] docs: fix README --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 57398ae..23f443d 100644 --- a/README.md +++ b/README.md @@ -326,11 +326,12 @@ async function bootstrap(): Promise { strategy: app.get(KafkaRetryConsumer), }); + // Put `app.listen(...)` before `app.startAllMicroservice()` + await app.listen(...) + await app.startAllMicroservices(); // ... - - await app.listen(...) } // ...