Skip to content

Commit

Permalink
fix: use onApplicationBootstrap to start processing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
vglebovich-lmru committed Sep 2, 2024
1 parent 19b393a commit b4b890c
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions src/consumer/kafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import vm from "vm";

import { Inject, Injectable } from "@nestjs/common";
import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common";
import {
CustomTransportStrategy,
MessageHandler,
Expand All @@ -37,7 +37,10 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces";
import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler";

@Injectable()
export class KafkaConsumer extends Server implements CustomTransportStrategy {
export class KafkaConsumer
extends Server
implements CustomTransportStrategy, OnApplicationBootstrap
{
public readonly transportId = KafkaConsumerTransportId;

private readonly messageHandlersMap: Map<
Expand All @@ -54,15 +57,18 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy {
) {
super();
}

public async close(): Promise<void> {
await Promise.all(
[...this.consumersMap.values()].map((x) => x.disconnect()),
);
}

public listen(callback: (error?: unknown) => void): void {
this.start().then(callback).catch(callback);
this.connectAndSubscribe().then(callback).catch(callback);
}

public async onApplicationBootstrap(): Promise<void> {
await this.startProcessingMessages();
}

private addMessageHandlerToMap(
Expand All @@ -81,6 +87,12 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy {
}
}

private async connectAndSubscribe(): Promise<void> {
await Promise.all([...this.consumersMap.values()].map((x) => x.connect()));

await this.subscribe();
}

private getConsumer(connectionName: string): Consumer {
const consumer = this.consumersMap.get(connectionName);

Expand All @@ -93,7 +105,7 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy {
return consumer;
}

private async run(): Promise<void> {
private async startProcessingMessages(): Promise<void> {
for (const [connectionName, consumer] of this.consumersMap) {
await consumer.run({
eachMessage: async (rawPayload) => {
Expand All @@ -111,13 +123,6 @@ export class KafkaConsumer extends Server implements CustomTransportStrategy {
}
}

private async start(): Promise<void> {
await Promise.all([...this.consumersMap.values()].map((x) => x.connect()));

await this.subscribe();
await this.run();
}

private async subscribe(): Promise<void> {
const context = vm.createContext({
topicPickerArgs: this.kafkaOptions.topicPickerArgs,
Expand Down

0 comments on commit b4b890c

Please sign in to comment.