Skip to content

vglebovich-lmru/Byndyusoft-nest-kafka

 
 

Repository files navigation

nest-kafka

npm@latest test code style: prettier semantic-release

Kafka for NestJS

Features

  • Multiple connections
  • Consumer and Producer with Schema Registry support (using kafkajs and @kafkajs/confluent-schema-registry under the hood)
  • Integration with nest-template
  • Consumer
    • Subscribe topic is not static, you can pick it from config
    • Process message in async context with Tracing and Logging
    • String, JSON and Schema Registry decoders for key and value, headers decoder with array support
    • Dead letter queue pattern support with smart retry mechanism
    • Support custom decoders and error handling patterns

Requirements

  • Node.js v14 LTS or later
  • Yarn

Install

yarn add @byndyusoft/nest-kafka @byndyusoft/class-validator-extended @byndyusoft/nest-opentracing @byndyusoft/nest-pino @kafkajs/confluent-schema-registry @nestjs/common @nestjs/microservices class-transformer class-validator kafkajs rxjs

Usage

Init

1. Create KafkaConfigDto
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { Type } from "class-transformer";
import { IsDefined, IsString, ValidateNested } from "class-validator";

export class KafkaConfigDto {
  @Type(() => KafkaClusterConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly cluster!: KafkaClusterConfigDto;

  @Type(() => KafkaConsumerConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly consumer!: KafkaConsumerConfigDto;

  @Type(() => KafkaProducerConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly producer!: KafkaProducerConfigDto;

  @Type(() => KafkaSchemaRegistryArgsConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly schemaRegistry!: KafkaSchemaRegistryArgsConfigDto;

  @IsString()
  public readonly topic!: string;

  @IsString()
  public readonly errorTopic!: string;
}
2. Add KafkaConfigDto into ConfigDto
import { Type } from "class-transformer";
import { IsDefined, ValidateNested } from "class-validator";

import { KafkaConfigDto } from "./kafkaConfigDto";

export class ConfigDto {
  /// ...

  @Type(() => KafkaConfigDto)
  @IsDefined()
  @ValidateNested()
  public readonly kafka!: KafkaConfigDto;

  /// ...
}
3. Add env variables mapping
import { Module } from "@nestjs/common";

import { ConfigDto } from "./dtos";

@Module({})
export class ConfigModule {
  // ...

  private static __loadConfig(): ConfigDto {
    const plainConfig: ConfigDto = {
      // ...
      kafka: {
        cluster: {
          brokers: process.env.KAFKA_BROKERS as string,
          saslMechanism: process.env.KAFKA_SASL_MECHANISM,
          username: process.env.KAFKA_USERNAME,
          password: process.env.KAFKA_PASSWORD,
          ssl: process.env.KAFKA_SSL,
          ca: process.env.KAFKA_CA,
          connectionTimeout: process.env.KAFKA_CONNECTION_TIMEOUT, // default is 1 s.
        },
        consumer: {
          groupId: process.env.KAFKA_CONSUMER_GROUP_ID as string,
          allowAutoTopicCreation:
            process.env.KAFKA_CONSUMER_ALLOW_AUTO_TOPIC_CREATION ?? true,
          sessionTimeout: process.env.KAFKA_SESSION_TIMEOUT_MS ?? 30000,
          heartbeatInterval: process.env.KAFKA_HEARTBEAT_INTERVAL_MS ?? 3000,
        },
        producer: {
          allowAutoTopicCreation:
            process.env.KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION ?? true,
        },
        schemaRegistry: {
          host: process.env.KAFKA_SCHEMA_REGISTRY_HOST as string,
          username: process.env.KAFKA_SCHEMA_REGISTRY_USERNAME,
          password: process.env.KAFKA_SCHEMA_REGISTRY_PASSWORD,
        },
        topic: process.env.KAFKA_TOPIC as string,
        errorTopic: process.env.KAFKA_ERROR_TOPIC as string,
      },
      // ...
    };

    // ...
  }
}
4. Import KafkaModule
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
            producer: KafkaProducerConfigDto.toRawConfig(config.kafka.producer),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka.schemaRegistry,
              ),
            },
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
4.1. You can describe multiple connections (farther use connectionName parameter in some functions to specify your connection)
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            name: "connection1",
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka1.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(
              config.kafka1.consumer,
            ),
            producer: KafkaProducerConfigDto.toRawConfig(
              config.kafka1.producer,
            ),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka1.schemaRegistry,
              ),
            },
          },
          {
            name: "connection2",
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka2.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(
              config.kafka2.consumer,
            ),
            producer: KafkaProducerConfigDto.toRawConfig(
              config.kafka2.producer,
            ),
            schemaRegistry: {
              args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                config.kafka2.schemaRegistry,
              ),
            },
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
4.2. If you want, you can not create consumer, producer or schemaRegistry
import {
  KafkaClusterConfigDto,
  KafkaConsumerConfigDto,
  KafkaModule,
  KafkaProducerConfigDto,
  KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";

import { ConfigDto } from "./config";

@Module({
  imports: [
    // Extra modules
    // ...
    KafkaModule.registerAsync({
      inject: [ConfigDto],
      useFactory: (config: ConfigDto) => ({
        connections: [
          {
            cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
            consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
          },
        ],
        topicPickerArgs: [config],
      }),
    }),
    // ...
  ],
})
export class InfrastructureModule {
  // ...
}
5. Connect microservice to start consuming messages
import { KafkaConsumer, KafkaRetryConsumer } from "@byndyusoft/nest-kafka";
import { MicroserviceOptions } from "@nestjs/microservices";

async function bootstrap(): Promise<void> {
  // ...

  app.connectMicroservice<MicroserviceOptions>({
    strategy: app.get(KafkaConsumer),
  });

  // you can optionally connect retry consumer
  app.connectMicroservice<MicroserviceOptions>({
    strategy: app.get(KafkaRetryConsumer),
  });

  await app.startAllMicroservices();

  // ...
}

// ...

Consuming Messages

1. Create controller and use KafkaConsumerEventPattern to describe consumer
import {
  IKafkaConsumerPayload,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  public async onMessage(
    @Payload() payload: IKafkaConsumerPayload,
  ): Promise<void> {
    // ...
  }
}
2. Decode payload
import {
  IKafkaConsumerPayload,
  KafkaConsumerEventPattern,
  KafkaConsumerPayloadDecoder,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";

import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseInterceptors(
    new KafkaConsumerPayloadDecoder({
      key: "string",
      value: "json",
      headers: "string",
    }),
  )
  public async onMessage(
    @Payload() payload: IKafkaConsumerPayload<string, UserDto>,
  ): Promise<void> {
    // ...
  }
}
2.1. You can use param decorators to get key, value or headers
import {
  IKafkaConsumerPayloadHeaders,
  KafkaConsumerEventPattern,
  KafkaConsumerPayloadDecoder,
  KafkaHeaders,
  KafkaKey,
  KafkaValue,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";

import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseInterceptors(
    new KafkaConsumerPayloadDecoder({
      key: "string",
      value: "json",
      headers: "string",
    }),
  )
  public async onMessage(
    @KafkaKey() key: string,
    @KafkaValue() value: UserDto,
    @KafkaHeaders() headers: IKafkaConsumerPayloadHeaders,
  ): Promise<void> {
    // ...
  }
}
3. Always use some exception filter for correct error handling
import {
  KafkaConsumerBaseExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(/* ... */)
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.1. Use KafkaConsumerBaseExceptionFilter if you prefer Stop on error pattern
import {
  KafkaConsumerBaseExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(new KafkaConsumerBaseExceptionFilter())
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.2. Use KafkaConsumerErrorTopicExceptionFilter if you prefer Dead letter queue pattern
import {
  KafkaConsumerErrorTopicExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(
    new KafkaConsumerErrorTopicExceptionFilter({
      errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
    }),
  )
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.3. KafkaConsumerErrorTopicExceptionFilter also support retry topic for retriable errors
import {
  KafkaConsumerErrorTopicExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersConsumer {
  @KafkaConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.topic,
    fromBeginning: true,
  })
  @UseFilters(
    new KafkaConsumerErrorTopicExceptionFilter({
      retryTopicPicker: (config: ConfigDto) => config.kafka.retryTopic,
      errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
    }),
  )
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}
3.4. Use retry consumer to consume messages from retry topic
import {
  KafkaConsumerErrorTopicExceptionFilter,
  KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";

import { ConfigDto } from "~/src";

@Controller()
export class UsersRetryConsumer {
  @KafkaRetryConsumerEventPattern({
    topicPicker: (config: ConfigDto) => config.kafka.retryTopic,
    fromBeginning: true,
  })
  @UseFilters(
    new KafkaConsumerErrorTopicExceptionFilter({
      retryTopicPicker: false,
      errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
      resendHeadersPrefix: "retry",
    }),
  )
  public async onMessage(): Promise<void> {
    throw new Error("some error");
  }
}

Run retry consumer, e.g by HTTP:

import { ApiTags } from "@byndyusoft/nest-swagger";
import { Body, Controller, HttpCode, HttpStatus, Post } from "@nestjs/common";

import { ApiCommonResponses } from "../infrastructure";

import { RunDeliveryAppointmentsRetryConsumerOnceBodyDto } from "./dtos";
import { RunDeliveryAppointmentsRetryConsumerOnceUseCase } from "./useCases";

@ApiTags("Users")
@Controller({
  path: "/users/retry",
  version: "1",
})
export class UsersRetryController {
  public constructor(
    private readonly config: ConfigDto,
    private readonly kafkaRetryConsumer: KafkaRetryConsumer,
  ) {}

  @ApiCommonResponses(HttpStatus.BAD_REQUEST)
  @HttpCode(HttpStatus.NO_CONTENT)
  @Post("/runRetryConsumerOnce")
  public runDeliveryAppointmentsRetryConsumerOnce(): Promise<void> {
    await this.kafkaRetryConsumer.runOnce({
      topic: config.kafka.retryTopic,
      messagesCount: 1,
    });
  }
}

Producing Messages

1. Inject KafkaProducer
import { InjectKafkaProducer, KafkaProducer } from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";

@Injectable()
export class UsersService {
  public constructor(
    @InjectKafkaProducer()
    private readonly __kafkaProducer: KafkaProducer,
  ) {}
}

Schema Registry

1. Inject KafkaSchemaRegistry
import {
  InjectKafkaSchemaRegistry,
  KafkaSchemaRegistry,
} from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";

@Injectable()
export class UsersService {
  public constructor(
    @InjectKafkaSchemaRegistry()
    private readonly __kafkaSchemaRegistry: KafkaSchemaRegistry,
  ) {}
}

Maintainers

License

This repository is released under version 2.0 of the Apache License.

About

Kafka for NestJS

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • TypeScript 99.9%
  • Shell 0.1%