From 271e459a5deb13f77906fb58c8308151ef6415a1 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 11 Oct 2024 15:17:11 -0700 Subject: [PATCH] feat: add keys into kafka header while producing (#2143) Signed-off-by: Vigith Maurice --- docs/user-guide/sinks/kafka.md | 10 ++++++++++ pkg/sinks/kafka/kafka.go | 22 ++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/docs/user-guide/sinks/kafka.md b/docs/user-guide/sinks/kafka.md index ae4d85f8ef..6837f262ff 100644 --- a/docs/user-guide/sinks/kafka.md +++ b/docs/user-guide/sinks/kafka.md @@ -2,6 +2,16 @@ A `Kafka` sink is used to forward the messages to a Kafka topic. Kafka sink supports configuration overrides. +## Kafka Headers + +We will insert `keys` into the Kafka header, but since `keys` is an array, we will add `keys` into the header in the +following format. + +* `__keys_len` will have the number of `key` in the header. if `__keys_len` == `0`, means no `keys` are present. +* `__keys_%d` will have the `key`, e.g., `__key_0` will be the first key, and so forth. + +## Example + ```yaml spec: vertices: diff --git a/pkg/sinks/kafka/kafka.go b/pkg/sinks/kafka/kafka.go index cfbd2d27ff..b1ba1f05f3 100644 --- a/pkg/sinks/kafka/kafka.go +++ b/pkg/sinks/kafka/kafka.go @@ -150,10 +150,32 @@ func (tk *ToKafka) Write(_ context.Context, messages []isb.Message) ([]isb.Offse } } }() + for index, msg := range messages { + // insert keys in the header. + // since keys is an array, to decompose it, we need len and key at each index. + var headers []sarama.RecordHeader + // insert __key_len + keyLen := sarama.RecordHeader{ + Key: []byte("__key_len"), + Value: []byte(fmt.Sprintf("%d", len(msg.Keys))), + } + headers = append(headers, keyLen) + + // write keys into header if length > 0 + if len(msg.Keys) > 0 { + for idx, key := range msg.Keys { + headers = append(headers, sarama.RecordHeader{ + Key: []byte(fmt.Sprintf("__key_%d", idx)), + Value: []byte(key), + }) + } + } + message := &sarama.ProducerMessage{ Topic: tk.topic, Value: sarama.ByteEncoder(msg.Payload), + Headers: headers, Metadata: index, // Use metadata to identify if it succeeds or fails in the async return. } tk.producer.Input() <- message