Skip to content

Byndyusoft/nest-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
Jan 26, 2023
May 12, 2022
Mar 15, 2023
May 23, 2022
May 12, 2022
May 12, 2022
May 23, 2022
Jan 26, 2023
Jan 26, 2023
May 12, 2022
May 23, 2022
May 12, 2022
Oct 22, 2022
Jan 26, 2023
May 12, 2022
Jan 26, 2023
Sep 28, 2023
Jan 26, 2023
Jan 26, 2023
Jan 26, 2023
Sep 28, 2023

Repository files navigation

nest-kafka

npm@latest test code style: prettier semantic-release

Kafka for NestJS

Features

  • Multiple connections
  • Consumer and Producer with Schema Registry support (using kafkajs and @kafkajs/confluent-schema-registry under the hood)
  • Integration with nest-template
  • Consumer
    • Subscribe topic is not static, you can pick it from config
    • Process message in async context with Tracing and Logging
    • String, JSON and Schema Registry decoders for key and value, headers decoder with array support
    • Dead letter queue pattern support with smart retry mechanism
    • Support custom decoders and error handling patterns

Requirements

  • Node.js v14 LTS or later
  • Yarn

Install

yarn add @byndyusoft/nest-kafka @byndyusoft/class-validator-extended @byndyusoft/nest-opentracing @byndyusoft/nest-pino @kafkajs/confluent-schema-registry @nestjs/common @nestjs/microservices class-transformer class-validator kafkajs rxjs

Usage

Init

1. Create KafkaConfigDto
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { Type } from "class-transformer";
import { IsDefined, IsString, ValidateNested } from "class-validator";

export class KafkaConfigDto {
  @Type(() => KafkaClusterConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly cluster!: KafkaClusterConfigDto;

  @Type(() => KafkaConsumerConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly consumer!: KafkaConsumerConfigDto;

  @Type(() => KafkaProducerConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly producer!: KafkaProducerConfigDto;

  @Type(() => KafkaSchemaRegistryArgsConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly schemaRegistry!: KafkaSchemaRegistryArgsConfigDto;

  @IsString()
  public readonly topic!: string;

  @IsString()
  public readonly errorTopic!: string;
}
2. Add KafkaConfigDto into ConfigDto
import { Type } from "class-transformer";
import { IsDefined, ValidateNested } from "class-validator";

import { KafkaConfigDto } from "./kafkaConfigDto";

export class ConfigDto {
  /// ...

  @Type(() => KafkaConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly kafka!: KafkaConfigDto;

  /// ...
}
3. Add env variables mapping
import { Module } from "@nestjs/common";

import { ConfigDto } from "./dtos";

@Module({})
export class ConfigModule {
  // ...

  private static __loadConfig(): ConfigDto {
    const plainConfig: ConfigDto = {
      // ...
      kafka: {
        cluster: {
          brokers: process.env.KAFKA_BROKERS as string,
          saslMechanism: process.env.KAFKA_SASL_MECHANISM,
          username: process.env.KAFKA_USERNAME,
          password: process.env.KAFKA_PASSWORD,
          ssl: process.env.KAFKA_SSL,
          ca: process.env.KAFKA_CA,
        },
        consumer: {
          groupId: process.env.KAFKA_CONSUMER_GROUP_ID as string,
          allowAutoTopicCreation:
            process.env.KAFKA_CONSUMER_ALLOW_AUTO_TOPIC_CREATION ?? true,
          sessionTimeout: process.env.KAFKA_SESSION_TIMEOUT_MS ?? 30000,
          heartbeatInterval: process.env.KAFKA_HEARTBEAT_INTERVAL_MS ?? 3000,
        },
        producer: {
          allowAutoTopicCreation:
            process.env.KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION ?? true,
        },
        schemaRegistry: {
          host: process.env.KAFKA_SCHEMA_REGISTRY_HOST as string,
          username: process.env.KAFKA_SCHEMA_REGISTRY_USERNAME,
          password: process.env.KAFKA_SCHEMA_REGISTRY_PASSWORD,
        },
        topic: process.env.KAFKA_TOPIC as string,
        errorTopic: process.env.KAFKA_ERROR_TOPIC as string,
      },
      // ...
    };

    // ...
  }
}
4. Import KafkaModule
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
            producer: KafkaProducerConfigDto.toRawConfig(config.kafka.producer),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka.schemaRegistry,
              ),
            },
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
4.1. You can describe multiple connections (farther use connectionName parameter in some functions to specify your connection)
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            name: "connection1",
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka1.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(
              config.kafka1.consumer,
            ),
            producer: KafkaProducerConfigDto.toRawConfig(
              config.kafka1.producer,
            ),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka1.schemaRegistry,
              ),
            },
          },
          {
            name: "connection2",
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka2.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(
              config.kafka2.consumer,
            ),
            producer: KafkaProducerConfigDto.toRawConfig(
              config.kafka2.producer,
            ),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka2.schemaRegistry,
              ),
            },
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
4.2. If you want, you can not create consumer, producer or schemaRegistry
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
5. Connect microservice to start consuming messages
import { KafkaConsumer } from "@byndyusoft/nest-kafka";
import { MicroserviceOptions } from "@nestjs/microservices";

async function bootstrap(): Promise<void> {
  // ...

  app.connectMicroservice<MicroserviceOptions>({
    strategy: app.get(KafkaConsumer),
  });

  await app.startAllMicroservices();

  // ...
}

// ...

Consuming Messages

1. Create controller and use KafkaConsumerEventPattern to describe consumer
import {
  IKafkaConsumerPayload,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  public async onMessage(
    @Payload() payload: IKafkaConsumerPayload,
  ): Promise<void> {
    // ...
  }
}
2. Decode payload
import {
  IKafkaConsumerPayload,
  KafkaConsumerEventPattern,
  KafkaConsumerPayloadDecoder,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";

import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseInterceptors(
    new KafkaConsumerPayloadDecoder({
      key: "string",
      value: "json",
      headers: "string",
    }),
  )
  public async onMessage(
    @Payload() payload: IKafkaConsumerPayload<string, UserDto>,
  ): Promise<void> {
    // ...
  }
}
2.1. You can use param decorators to get key, value or headers
import {
  IKafkaConsumerPayloadHeaders,
  KafkaConsumerEventPattern,
  KafkaConsumerPayloadDecoder,
  KafkaHeaders,
  KafkaKey,
  KafkaValue,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";

import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseInterceptors(
    new KafkaConsumerPayloadDecoder({
      key: "string",
      value: "json",
      headers: "string",
    }),
  )
  public async onMessage(
    @KafkaKey() key: string,
    @KafkaValue() value: UserDto,
    @KafkaHeaders() headers: IKafkaConsumerPayloadHeaders,
  ): Promise<void> {
    // ...
  }
}
3. Always use some exception filter for correct error handling
import {
  KafkaConsumerBaseExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(/* ... */)
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.1. Use KafkaConsumerBaseExceptionFilter if you prefer Stop on error pattern
import {
  KafkaConsumerBaseExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(new KafkaConsumerBaseExceptionFilter())
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.2. Use KafkaConsumerErrorTopicExceptionFilter if you prefer Dead letter queue pattern
import {
  KafkaConsumerErrorTopicExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(
    new KafkaConsumerErrorTopicExceptionFilter({
      topicPicker: (config: ConfigDto) => config.kafka.errorTopic,
    }),
  )
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}

Producing Messages

1. Inject KafkaProducer
import { InjectKafkaProducer, KafkaProducer } from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";

@Injectable()
export class UsersService {
  public constructor(
    @InjectKafkaProducer()
    private readonly __kafkaProducer: KafkaProducer,
  ) {}
}

Schema Registry

1. Inject KafkaSchemaRegistry
import {
  InjectKafkaSchemaRegistry,
  KafkaSchemaRegistry,
} from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";

@Injectable()
export class UsersService {
  public constructor(
    @InjectKafkaSchemaRegistry()
    private readonly __kafkaSchemaRegistry: KafkaSchemaRegistry,
  ) {}
}

Maintainers

License

This repository is released under version 2.0 of the Apache License.