From 8fb684af81adfb6e24ad3413472b4c373d33fdba Mon Sep 17 00:00:00 2001 From: KillWolfVlad Date: Wed, 17 Jan 2024 15:41:06 +0500 Subject: [PATCH] feat: add resendHeadersPrefix in error topic exception filter --- README.md | 1 + .../kafkaConsumerErrorTopicExceptionFilter.ts | 23 +++++++++++-------- ...rorTopicExceptionFilterOptionsInterface.ts | 5 ++++ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 15ed43a..9d84b42 100644 --- a/README.md +++ b/README.md @@ -590,6 +590,7 @@ export class UsersRetryConsumer { new KafkaConsumerErrorTopicExceptionFilter({ retryTopicPicker: false, errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic, + resendHeadersPrefix: "retry", }), ) public async onMessage(): Promise { diff --git a/src/consumer/exceptions/kafkaConsumerErrorTopicExceptionFilter.ts b/src/consumer/exceptions/kafkaConsumerErrorTopicExceptionFilter.ts index 33c1f1d..1609d2e 100644 --- a/src/consumer/exceptions/kafkaConsumerErrorTopicExceptionFilter.ts +++ b/src/consumer/exceptions/kafkaConsumerErrorTopicExceptionFilter.ts @@ -22,6 +22,7 @@ import { Logger, RpcExceptionFilter, } from "@nestjs/common"; +import { IHeaders } from "kafkajs"; import { from, Observable, throwError } from "rxjs"; import { @@ -103,6 +104,7 @@ export class KafkaConsumerErrorTopicExceptionFilter host, this.options.connectionName ?? context.connectionName, topic, + this.options.resendHeadersPrefix, ), ); } @@ -130,6 +132,7 @@ export class KafkaConsumerErrorTopicExceptionFilter host: ArgumentsHost, connectionName: string, topic: string, + resendHeadersPrefix = "original", ): Promise { const rpcHost = host.switchToRpc(); const payload: IKafkaConsumerPayload = rpcHost.getData(); @@ -139,21 +142,23 @@ export class KafkaConsumerErrorTopicExceptionFilter this.logger.warn(`Send message to ${topic}`); + const headers: IHeaders = { + ...payload.rawPayload.message.headers, + [`${resendHeadersPrefix}Topic`]: payload.rawPayload.topic, + [`${resendHeadersPrefix}Partition`]: String(payload.rawPayload.partition), + [`${resendHeadersPrefix}Offset`]: payload.rawPayload.message.offset, + [`${resendHeadersPrefix}Timestamp`]: payload.rawPayload.message.timestamp, + [`${resendHeadersPrefix}TraceId`]: context.traceId, + error: JSON.stringify(serializeError(cause)), + }; + await context.kafkaCoreProducer.send(connectionName, { topic, messages: [ { key: payload.rawPayload.message.key, value: payload.rawPayload.message.value, - headers: { - ...payload.rawPayload.message.headers, - originalTopic: payload.rawPayload.topic, - originalPartition: String(payload.rawPayload.partition), - originalOffset: payload.rawPayload.message.offset, - originalTimestamp: payload.rawPayload.message.timestamp, - originalTraceId: context.traceId, - error: JSON.stringify(serializeError(cause)), - }, + headers, }, ], }); diff --git a/src/consumer/interfaces/kafkaConsumerErrorTopicExceptionFilterOptionsInterface.ts b/src/consumer/interfaces/kafkaConsumerErrorTopicExceptionFilterOptionsInterface.ts index 7ea72e1..7dc1085 100644 --- a/src/consumer/interfaces/kafkaConsumerErrorTopicExceptionFilterOptionsInterface.ts +++ b/src/consumer/interfaces/kafkaConsumerErrorTopicExceptionFilterOptionsInterface.ts @@ -27,4 +27,9 @@ export interface IKafkaConsumerErrorTopicExceptionFilterOptions { * @deprecated Use errorTopicPicker instead */ readonly topicPicker?: ((...args: any[]) => string) | false; + + /** + * @default "original" + */ + readonly resendHeadersPrefix?: string; }