Skip to content

Commit

Permalink
Revert "fix: use onApplicationBootstrap to start processing messages"
Browse files Browse the repository at this point in the history
This reverts commit b4b890c.
  • Loading branch information
vglebovich-lmru committed Sep 3, 2024
1 parent b4b890c commit f626db8
Showing 1 changed file with 12 additions and 17 deletions.
29 changes: 12 additions & 17 deletions src/consumer/kafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import vm from "vm";

import { Inject, Injectable, OnApplicationBootstrap } from "@nestjs/common";
import { Inject, Injectable } from "@nestjs/common";
import {
CustomTransportStrategy,
MessageHandler,
Expand All @@ -37,10 +37,7 @@ import { IKafkaConsumerSerializedOptions } from "./interfaces";
import { KafkaConsumerMessageHandler } from "./kafkaConsumerMessageHandler";

@Injectable()
export class KafkaConsumer
extends Server
implements CustomTransportStrategy, OnApplicationBootstrap
{
export class KafkaConsumer extends Server implements CustomTransportStrategy {
public readonly transportId = KafkaConsumerTransportId;

private readonly messageHandlersMap: Map<
Expand All @@ -57,18 +54,15 @@ export class KafkaConsumer
) {
super();
}

public async close(): Promise<void> {
await Promise.all(
[...this.consumersMap.values()].map((x) => x.disconnect()),
);
}

public listen(callback: (error?: unknown) => void): void {
this.connectAndSubscribe().then(callback).catch(callback);
}

public async onApplicationBootstrap(): Promise<void> {
await this.startProcessingMessages();
this.start().then(callback).catch(callback);
}

private addMessageHandlerToMap(
Expand All @@ -87,12 +81,6 @@ export class KafkaConsumer
}
}

private async connectAndSubscribe(): Promise<void> {
await Promise.all([...this.consumersMap.values()].map((x) => x.connect()));

await this.subscribe();
}

private getConsumer(connectionName: string): Consumer {
const consumer = this.consumersMap.get(connectionName);

Expand All @@ -105,7 +93,7 @@ export class KafkaConsumer
return consumer;
}

private async startProcessingMessages(): Promise<void> {
private async run(): Promise<void> {
for (const [connectionName, consumer] of this.consumersMap) {
await consumer.run({
eachMessage: async (rawPayload) => {
Expand All @@ -123,6 +111,13 @@ export class KafkaConsumer
}
}

private async start(): Promise<void> {
await Promise.all([...this.consumersMap.values()].map((x) => x.connect()));

await this.subscribe();
await this.run();
}

private async subscribe(): Promise<void> {
const context = vm.createContext({
topicPickerArgs: this.kafkaOptions.topicPickerArgs,
Expand Down

0 comments on commit f626db8

Please sign in to comment.